Skip to content
This repository was archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
Added dask support to Python side
Browse files Browse the repository at this point in the history
  • Loading branch information
daminton committed Feb 21, 2023
1 parent 2e7f0be commit 927e137
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 33 deletions.
2 changes: 1 addition & 1 deletion examples/Chambers2013/init_cond.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

mtiny = 1e-2 * Ms
minimum_fragment_mass = 1e-5 * Ms
nfrag_reduction = 10.0
nfrag_reduction = 30.0
rng = default_rng(seed=3031179)

runname = "Chambers (2013)"
Expand Down
2 changes: 1 addition & 1 deletion examples/Chambers2013/scattermovie.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def update(self,frame):

return self.artists

sim = swiftest.Simulation(read_data=True)
sim = swiftest.Simulation(read_data=True, read_collisions=False, dask=True)
for plot_style in valid_plot_styles:
animation_file = f"Chambers2013-{plot_style}.mp4"
print('Making animation')
Expand Down
10 changes: 7 additions & 3 deletions python/swiftest/swiftest/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,14 +825,16 @@ def process_netcdf_input(ds, param):
return ds


def swiftest2xr(param, verbose=True):
def swiftest2xr(param, verbose=True, dask=False):
"""
Converts a Swiftest binary data file into an xarray DataSet.
Parameters
----------
param : dict
Swiftest parameters
dask : bool, default False
Use Dask to lazily load data (useful for very large datasets)
Returns
-------
Expand All @@ -842,8 +844,10 @@ def swiftest2xr(param, verbose=True):

if ((param['OUT_TYPE'] == 'NETCDF_DOUBLE') or (param['OUT_TYPE'] == 'NETCDF_FLOAT')):
if verbose: print('\nCreating Dataset from NetCDF file')
#ds = xr.open_dataset(param['BIN_OUT'], mask_and_scale=False)
ds = xr.open_mfdataset(param['BIN_OUT'], parallel=True, engine='h5netcdf', mask_and_scale=False)
if dask:
ds = xr.open_mfdataset(param['BIN_OUT'], engine='h5netcdf', mask_and_scale=False)
else:
ds = xr.open_dataset(param['BIN_OUT'], mask_and_scale=False)

ds = process_netcdf_input(ds, param)
else:
Expand Down
71 changes: 44 additions & 27 deletions python/swiftest/swiftest/simulation_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(self,read_param: bool = False,
read_collisions: bool | None = None,
read_encounters: bool | None = None,
simdir: os.PathLike | str = "simdata",
dask: bool = False,
**kwargs: Any):
"""
Expand Down Expand Up @@ -300,6 +301,8 @@ def __init__(self,read_param: bool = False,
time calculation. The exact floating-point results of the interaction will be different between the two
algorithm types.
Parameter input file equivalent: `ENCOUNTER_CHECK`
dask : bool, default False
Use Dask to lazily load data (useful for very large datasets)
verbose : bool, default True
If set to True, then more information is printed by Simulation methods as they are executed. Setting to
False suppresses most messages other than errors.
Expand Down Expand Up @@ -367,7 +370,7 @@ def __init__(self,read_param: bool = False,
self.read_encounters = read_encounters

if read_param or read_data:
if self.read_param(read_init_cond = True):
if self.read_param(read_init_cond = True, dask=dask):
# We will add the parameter file to the kwarg list. This will keep the set_parameter method from
# overriding everything with defaults when there are no arguments passed to Simulation()
kwargs['param_file'] = self.param_file
Expand All @@ -390,7 +393,7 @@ def __init__(self,read_param: bool = False,
if read_data:
binpath = os.path.join(self.simdir, self.param['BIN_OUT'])
if os.path.exists(binpath):
self.read_output_file()
self.read_output_file(dask=dask)
else:
raise FileNotFoundError(f"BIN_OUT file {binpath} not found.")
return
Expand Down Expand Up @@ -472,7 +475,7 @@ def _type_scrub(output_data):
pbar.close()
return

def run(self,**kwargs):
def run(self,dask: bool = False, **kwargs):
"""
Runs a Swiftest integration. Uses the parameters set by the `param` dictionary unless overridden by keyword
arguments. Accepts any keyword arguments that can be passed to `set_parameter`.
Expand Down Expand Up @@ -515,7 +518,7 @@ def run(self,**kwargs):
# Read in new data
self.read_encounters = True
self.read_collisions = True
self.read_output_file()
self.read_output_file(dask=dask)

return

Expand Down Expand Up @@ -2592,7 +2595,8 @@ def read_param(self,
param_file : os.PathLike | str | None = None,
codename: Literal["Swiftest", "Swifter", "Swift"] | None = None,
read_init_cond : bool | None = None,
verbose: bool | None = None):
verbose: bool | None = None,
dask: bool = False):
"""
Reads in an input parameter file and stores the values in the param dictionary.
Expand All @@ -2607,6 +2611,9 @@ def read_param(self,
verbose : bool, default is the value of the Simulation object's internal `verbose`
If set to True, then more information is printed by Simulation methods as they are executed. Setting to
False suppresses most messages other than errors.
dask : bool, default False
Use Dask to lazily load data (useful for very large datasets)
Returns
-------
True if the parameter file exists and is read correctly. False otherwise.
Expand All @@ -2632,7 +2639,7 @@ def read_param(self,
if os.path.exists(init_cond_file):
param_tmp = self.param.copy()
param_tmp['BIN_OUT'] = init_cond_file
self.data = io.swiftest2xr(param_tmp, verbose=self.verbose)
self.data = io.swiftest2xr(param_tmp, verbose=self.verbose, dask=dask)
self.init_cond = self.data.copy(deep=True)
else:
warnings.warn(f"Initial conditions file file {init_cond_file} not found.", stacklevel=2)
Expand Down Expand Up @@ -2715,24 +2722,26 @@ def write_param(self,
return

def convert(self, param_file, newcodename="Swiftest", plname="pl.swiftest.in", tpname="tp.swiftest.in",
cbname="cb.swiftest.in", conversion_questions={}):
cbname="cb.swiftest.in", conversion_questions={}, dask = False):
"""
Converts simulation input files from one format to another (Swift, Swifter, or Swiftest).
Parameters
----------
param_file : string
File name of the input parameter file
File name of the input parameter file
newcodename : string
Name of the desired format (Swift/Swifter/Swiftest)
Name of the desired format (Swift/Swifter/Swiftest)
plname : string
File name of the massive body input file
File name of the massive body input file
tpname : string
File name of the test particle input file
File name of the test particle input file
cbname : string
File name of the central body input file
File name of the central body input file
conversion_questions : dictronary
Dictionary of additional parameters required to convert between formats
Dictionary of additional parameters required to convert between formats
dask : bool, default False
Use Dask to lazily load data (useful for very large datasets)
Returns
-------
Expand Down Expand Up @@ -2768,15 +2777,16 @@ def convert(self, param_file, newcodename="Swiftest", plname="pl.swiftest.in", t
warnings.warn(f"Conversion from {self.codename} to {newcodename} is not supported.",stacklevel=2)
return oldparam

def read_output_file(self,read_init_cond : bool = True):
def read_output_file(self,read_init_cond : bool = True, dask : bool = False):
"""
Reads in simulation data from an output file and stores it as an Xarray Dataset in the `data` instance variable.
Parameters
----------
read_init_cond : bool
Read in an initial conditions file along with the output file. Default is True
read_init_cond : bool, default True
Read in an initial conditions file along with the output file. Default is True
dask : bool, default False
Use Dask to lazily load data (useful for very large datasets)
Returns
-------
self.data : xarray dataset
Expand All @@ -2789,22 +2799,22 @@ def read_output_file(self,read_init_cond : bool = True):
param_tmp = self.param.copy()
param_tmp['BIN_OUT'] = os.path.join(self.simdir, self.param['BIN_OUT'])
if self.codename == "Swiftest":
self.data = io.swiftest2xr(param_tmp, verbose=self.verbose)
self.data = io.swiftest2xr(param_tmp, verbose=self.verbose, dask=dask)
if self.verbose:
print('Swiftest simulation data stored as xarray DataSet .data')
if read_init_cond:
if self.verbose:
print("Reading initial conditions file as .init_cond")
if "NETCDF" in self.param['IN_TYPE']:
param_tmp['BIN_OUT'] = self.simdir / self.param['NC_IN']
self.init_cond = io.swiftest2xr(param_tmp, verbose=False)
self.init_cond = io.swiftest2xr(param_tmp, verbose=False, dask=dask)
else:
self.init_cond = self.data.isel(time=0)

if self.read_encounters:
self.read_encounter_file()
self.read_encounter_file(dask=dask)
if self.read_collisions:
self.read_collision_file()
self.read_collision_file(dask=dask)
if self.verbose:
print("Finished reading Swiftest dataset files.")

Expand All @@ -2817,12 +2827,15 @@ def read_output_file(self,read_init_cond : bool = True):
warnings.warn('Cannot process unknown code type. Call the read_param method with a valid code name. Valid options are "Swiftest", "Swifter", or "Swift".',stacklevel=2)
return

def read_encounter_file(self):
def read_encounter_file(self, dask=False):
enc_file = self.simdir / "encounters.nc"
if not os.path.exists(enc_file):
return

self.encounters = xr.open_dataset(enc_file)
if dask:
self.encounters = xr.open_mfdataset(enc_file,engine='h5netcdf', mask_and_scale=False)
else:
self.encounters = xr.open_dataset(enc_file, mask_and_scale=False)
self.encounters = io.process_netcdf_input(self.encounters, self.param)

# Remove any overlapping time values
Expand All @@ -2834,7 +2847,7 @@ def read_encounter_file(self):

return

def read_collision_file(self):
def read_collision_file(self, dask=False):

col_file = self.simdir / "collisions.nc"
if not os.path.exists(col_file):
Expand All @@ -2843,12 +2856,16 @@ def read_collision_file(self):
if self.verbose:
print("Reading collisions history file as .collisions")

self.collisions = xr.open_dataset(col_file)
if dask:
self.collisions = xr.open_mfdataset(col_file,engine='h5netcdf', mask_and_scale=False)
else:
self.collisions = xr.open_dataset(col_file, mask_and_scale=False)

self.collisions = io.process_netcdf_input(self.collisions, self.param)

return

def follow(self, codestyle="Swifter"):
def follow(self, codestyle="Swifter", dask=False):
"""
An implementation of the Swift tool_follow algorithm. Under development. Currently only for Swift simulations.
Expand All @@ -2862,7 +2879,7 @@ def follow(self, codestyle="Swifter"):
fol : xarray dataset
"""
if self.data is None:
self.read_output_file()
self.read_output_file(dask=dask)
if codestyle == "Swift":
try:
with open('follow.in', 'r') as f:
Expand Down
2 changes: 1 addition & 1 deletion python/swiftest/swiftest/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def magnitude(ds,x):
dim = "space"
ord = None
return xr.apply_ufunc(
np.linalg.norm, ds[x], input_core_dims=[[dim]], kwargs={"ord": ord, "axis": -1}, dask="parallelized"
np.linalg.norm, ds[x].where(~np.isnan(ds[x])), input_core_dims=[[dim]], kwargs={"ord": ord, "axis": -1}, dask="parallelized"
)

def wrap_angle(angle):
Expand Down

0 comments on commit 927e137

Please sign in to comment.