From 927e13773edf0f03689bb81d0d8dfe48b97aec9a Mon Sep 17 00:00:00 2001 From: David A Minton Date: Mon, 20 Feb 2023 19:56:16 -0500 Subject: [PATCH] Added dask support to Python side --- examples/Chambers2013/init_cond.py | 2 +- examples/Chambers2013/scattermovie.py | 2 +- python/swiftest/swiftest/io.py | 10 ++- python/swiftest/swiftest/simulation_class.py | 71 ++++++++++++-------- python/swiftest/swiftest/tool.py | 2 +- 5 files changed, 54 insertions(+), 33 deletions(-) diff --git a/examples/Chambers2013/init_cond.py b/examples/Chambers2013/init_cond.py index 1b2437b6c..d5c9f9f19 100755 --- a/examples/Chambers2013/init_cond.py +++ b/examples/Chambers2013/init_cond.py @@ -43,7 +43,7 @@ mtiny = 1e-2 * Ms minimum_fragment_mass = 1e-5 * Ms -nfrag_reduction = 10.0 +nfrag_reduction = 30.0 rng = default_rng(seed=3031179) runname = "Chambers (2013)" diff --git a/examples/Chambers2013/scattermovie.py b/examples/Chambers2013/scattermovie.py index a5e99de4d..98b0645ef 100755 --- a/examples/Chambers2013/scattermovie.py +++ b/examples/Chambers2013/scattermovie.py @@ -152,7 +152,7 @@ def update(self,frame): return self.artists -sim = swiftest.Simulation(read_data=True) +sim = swiftest.Simulation(read_data=True, read_collisions=False, dask=True) for plot_style in valid_plot_styles: animation_file = f"Chambers2013-{plot_style}.mp4" print('Making animation') diff --git a/python/swiftest/swiftest/io.py b/python/swiftest/swiftest/io.py index d070bbb91..6a376d387 100644 --- a/python/swiftest/swiftest/io.py +++ b/python/swiftest/swiftest/io.py @@ -825,7 +825,7 @@ def process_netcdf_input(ds, param): return ds -def swiftest2xr(param, verbose=True): +def swiftest2xr(param, verbose=True, dask=False): """ Converts a Swiftest binary data file into an xarray DataSet. @@ -833,6 +833,8 @@ def swiftest2xr(param, verbose=True): ---------- param : dict Swiftest parameters + dask : bool, default False + Use Dask to lazily load data (useful for very large datasets) Returns ------- @@ -842,8 +844,10 @@ def swiftest2xr(param, verbose=True): if ((param['OUT_TYPE'] == 'NETCDF_DOUBLE') or (param['OUT_TYPE'] == 'NETCDF_FLOAT')): if verbose: print('\nCreating Dataset from NetCDF file') - #ds = xr.open_dataset(param['BIN_OUT'], mask_and_scale=False) - ds = xr.open_mfdataset(param['BIN_OUT'], parallel=True, engine='h5netcdf', mask_and_scale=False) + if dask: + ds = xr.open_mfdataset(param['BIN_OUT'], engine='h5netcdf', mask_and_scale=False) + else: + ds = xr.open_dataset(param['BIN_OUT'], mask_and_scale=False) ds = process_netcdf_input(ds, param) else: diff --git a/python/swiftest/swiftest/simulation_class.py b/python/swiftest/swiftest/simulation_class.py index 1e851b0fe..10ebba59e 100644 --- a/python/swiftest/swiftest/simulation_class.py +++ b/python/swiftest/swiftest/simulation_class.py @@ -49,6 +49,7 @@ def __init__(self,read_param: bool = False, read_collisions: bool | None = None, read_encounters: bool | None = None, simdir: os.PathLike | str = "simdata", + dask: bool = False, **kwargs: Any): """ @@ -300,6 +301,8 @@ def __init__(self,read_param: bool = False, time calculation. The exact floating-point results of the interaction will be different between the two algorithm types. Parameter input file equivalent: `ENCOUNTER_CHECK` + dask : bool, default False + Use Dask to lazily load data (useful for very large datasets) verbose : bool, default True If set to True, then more information is printed by Simulation methods as they are executed. Setting to False suppresses most messages other than errors. @@ -367,7 +370,7 @@ def __init__(self,read_param: bool = False, self.read_encounters = read_encounters if read_param or read_data: - if self.read_param(read_init_cond = True): + if self.read_param(read_init_cond = True, dask=dask): # We will add the parameter file to the kwarg list. This will keep the set_parameter method from # overriding everything with defaults when there are no arguments passed to Simulation() kwargs['param_file'] = self.param_file @@ -390,7 +393,7 @@ def __init__(self,read_param: bool = False, if read_data: binpath = os.path.join(self.simdir, self.param['BIN_OUT']) if os.path.exists(binpath): - self.read_output_file() + self.read_output_file(dask=dask) else: raise FileNotFoundError(f"BIN_OUT file {binpath} not found.") return @@ -472,7 +475,7 @@ def _type_scrub(output_data): pbar.close() return - def run(self,**kwargs): + def run(self,dask: bool = False, **kwargs): """ Runs a Swiftest integration. Uses the parameters set by the `param` dictionary unless overridden by keyword arguments. Accepts any keyword arguments that can be passed to `set_parameter`. @@ -515,7 +518,7 @@ def run(self,**kwargs): # Read in new data self.read_encounters = True self.read_collisions = True - self.read_output_file() + self.read_output_file(dask=dask) return @@ -2592,7 +2595,8 @@ def read_param(self, param_file : os.PathLike | str | None = None, codename: Literal["Swiftest", "Swifter", "Swift"] | None = None, read_init_cond : bool | None = None, - verbose: bool | None = None): + verbose: bool | None = None, + dask: bool = False): """ Reads in an input parameter file and stores the values in the param dictionary. @@ -2607,6 +2611,9 @@ def read_param(self, verbose : bool, default is the value of the Simulation object's internal `verbose` If set to True, then more information is printed by Simulation methods as they are executed. Setting to False suppresses most messages other than errors. + dask : bool, default False + Use Dask to lazily load data (useful for very large datasets) + Returns ------- True if the parameter file exists and is read correctly. False otherwise. @@ -2632,7 +2639,7 @@ def read_param(self, if os.path.exists(init_cond_file): param_tmp = self.param.copy() param_tmp['BIN_OUT'] = init_cond_file - self.data = io.swiftest2xr(param_tmp, verbose=self.verbose) + self.data = io.swiftest2xr(param_tmp, verbose=self.verbose, dask=dask) self.init_cond = self.data.copy(deep=True) else: warnings.warn(f"Initial conditions file file {init_cond_file} not found.", stacklevel=2) @@ -2715,24 +2722,26 @@ def write_param(self, return def convert(self, param_file, newcodename="Swiftest", plname="pl.swiftest.in", tpname="tp.swiftest.in", - cbname="cb.swiftest.in", conversion_questions={}): + cbname="cb.swiftest.in", conversion_questions={}, dask = False): """ Converts simulation input files from one format to another (Swift, Swifter, or Swiftest). Parameters ---------- param_file : string - File name of the input parameter file + File name of the input parameter file newcodename : string - Name of the desired format (Swift/Swifter/Swiftest) + Name of the desired format (Swift/Swifter/Swiftest) plname : string - File name of the massive body input file + File name of the massive body input file tpname : string - File name of the test particle input file + File name of the test particle input file cbname : string - File name of the central body input file + File name of the central body input file conversion_questions : dictronary - Dictionary of additional parameters required to convert between formats + Dictionary of additional parameters required to convert between formats + dask : bool, default False + Use Dask to lazily load data (useful for very large datasets) Returns ------- @@ -2768,15 +2777,16 @@ def convert(self, param_file, newcodename="Swiftest", plname="pl.swiftest.in", t warnings.warn(f"Conversion from {self.codename} to {newcodename} is not supported.",stacklevel=2) return oldparam - def read_output_file(self,read_init_cond : bool = True): + def read_output_file(self,read_init_cond : bool = True, dask : bool = False): """ Reads in simulation data from an output file and stores it as an Xarray Dataset in the `data` instance variable. Parameters ---------- - read_init_cond : bool - Read in an initial conditions file along with the output file. Default is True - + read_init_cond : bool, default True + Read in an initial conditions file along with the output file. Default is True + dask : bool, default False + Use Dask to lazily load data (useful for very large datasets) Returns ------- self.data : xarray dataset @@ -2789,7 +2799,7 @@ def read_output_file(self,read_init_cond : bool = True): param_tmp = self.param.copy() param_tmp['BIN_OUT'] = os.path.join(self.simdir, self.param['BIN_OUT']) if self.codename == "Swiftest": - self.data = io.swiftest2xr(param_tmp, verbose=self.verbose) + self.data = io.swiftest2xr(param_tmp, verbose=self.verbose, dask=dask) if self.verbose: print('Swiftest simulation data stored as xarray DataSet .data') if read_init_cond: @@ -2797,14 +2807,14 @@ def read_output_file(self,read_init_cond : bool = True): print("Reading initial conditions file as .init_cond") if "NETCDF" in self.param['IN_TYPE']: param_tmp['BIN_OUT'] = self.simdir / self.param['NC_IN'] - self.init_cond = io.swiftest2xr(param_tmp, verbose=False) + self.init_cond = io.swiftest2xr(param_tmp, verbose=False, dask=dask) else: self.init_cond = self.data.isel(time=0) if self.read_encounters: - self.read_encounter_file() + self.read_encounter_file(dask=dask) if self.read_collisions: - self.read_collision_file() + self.read_collision_file(dask=dask) if self.verbose: print("Finished reading Swiftest dataset files.") @@ -2817,12 +2827,15 @@ def read_output_file(self,read_init_cond : bool = True): warnings.warn('Cannot process unknown code type. Call the read_param method with a valid code name. Valid options are "Swiftest", "Swifter", or "Swift".',stacklevel=2) return - def read_encounter_file(self): + def read_encounter_file(self, dask=False): enc_file = self.simdir / "encounters.nc" if not os.path.exists(enc_file): return - self.encounters = xr.open_dataset(enc_file) + if dask: + self.encounters = xr.open_mfdataset(enc_file,engine='h5netcdf', mask_and_scale=False) + else: + self.encounters = xr.open_dataset(enc_file, mask_and_scale=False) self.encounters = io.process_netcdf_input(self.encounters, self.param) # Remove any overlapping time values @@ -2834,7 +2847,7 @@ def read_encounter_file(self): return - def read_collision_file(self): + def read_collision_file(self, dask=False): col_file = self.simdir / "collisions.nc" if not os.path.exists(col_file): @@ -2843,12 +2856,16 @@ def read_collision_file(self): if self.verbose: print("Reading collisions history file as .collisions") - self.collisions = xr.open_dataset(col_file) + if dask: + self.collisions = xr.open_mfdataset(col_file,engine='h5netcdf', mask_and_scale=False) + else: + self.collisions = xr.open_dataset(col_file, mask_and_scale=False) + self.collisions = io.process_netcdf_input(self.collisions, self.param) return - def follow(self, codestyle="Swifter"): + def follow(self, codestyle="Swifter", dask=False): """ An implementation of the Swift tool_follow algorithm. Under development. Currently only for Swift simulations. @@ -2862,7 +2879,7 @@ def follow(self, codestyle="Swifter"): fol : xarray dataset """ if self.data is None: - self.read_output_file() + self.read_output_file(dask=dask) if codestyle == "Swift": try: with open('follow.in', 'r') as f: diff --git a/python/swiftest/swiftest/tool.py b/python/swiftest/swiftest/tool.py index 225619204..285c74829 100644 --- a/python/swiftest/swiftest/tool.py +++ b/python/swiftest/swiftest/tool.py @@ -18,7 +18,7 @@ def magnitude(ds,x): dim = "space" ord = None return xr.apply_ufunc( - np.linalg.norm, ds[x], input_core_dims=[[dim]], kwargs={"ord": ord, "axis": -1}, dask="parallelized" + np.linalg.norm, ds[x].where(~np.isnan(ds[x])), input_core_dims=[[dim]], kwargs={"ord": ord, "axis": -1}, dask="parallelized" ) def wrap_angle(angle):