diff --git a/swiftest/simulation_class.py b/swiftest/simulation_class.py index df2c95bab..d6627c325 100644 --- a/swiftest/simulation_class.py +++ b/swiftest/simulation_class.py @@ -23,12 +23,10 @@ import datetime import xarray as xr import numpy as np -from functools import partial import numpy.typing as npt import shutil import warnings -import sys -from tqdm.auto import tqdm +import contextlib from typing import ( Literal, Dict, @@ -36,7 +34,7 @@ Tuple, Any ) -import contextlib +from cython import nogil @contextlib.contextmanager def _cwd(newdir): @@ -46,94 +44,6 @@ def _cwd(newdir): yield finally: os.chdir(olddir) - - -import os -import sys -import threading -import time - - -class OutputGrabber(object): - """ - Class used to grab standard output or another stream. - """ - escape_char = "\b" - - def __init__(self, stream=None, threaded=True): - self.origstream = stream - self.threaded = threaded - if self.origstream is None: - self.origstream = sys.stdout - self.origstreamfd = self.origstream.fileno() - self.capturedtext = "" - # Create a pipe so the stream can be captured: - self.pipe_out, self.pipe_in = os.pipe() - - def __enter__(self): - self.start() - return self - - def __exit__(self, type, value, traceback): - self.stop() - - def start(self): - """ - Start capturing the stream data. - """ - self.capturedtext = "" - # Save a copy of the stream: - self.streamfd = os.dup(self.origstreamfd) - # Replace the original stream with our write pipe: - os.dup2(self.pipe_in, self.origstreamfd) - if self.threaded: - # Start thread that will read the stream: - self.workerThread = threading.Thread(target=self.readOutput) - self.workerThread.start() - # Make sure that the thread is running and os.read() has executed: - time.sleep(0.01) - - def stop(self): - """ - Stop capturing the stream data and save the text in `capturedtext`. - """ - # Print the escape character to make the readOutput method stop: - self.origstream.write(self.escape_char) - # Flush the stream to make sure all our data goes in before - # the escape character: - self.origstream.flush() - if self.threaded: - # wait until the thread finishes so we are sure that - # we have until the last character: - self.workerThread.join() - else: - self.readOutput() - # Close the pipe: - os.close(self.pipe_in) - os.close(self.pipe_out) - # Restore the original stream: - os.dup2(self.streamfd, self.origstreamfd) - # Close the duplicate stream: - os.close(self.streamfd) - - def readOutput(self): - """ - Read the stream data (one byte at a time) - and save the text in `capturedtext`. - """ - - def _outstream(self): - capturedtext = '' - while True: - char = os.read(self.pipe_out,1).decode(self.origstream.encoding) - if not char or self.newline in char: - break - capturedtext += char - yield capturedtext - - while True: - self.capturedtext = _outstream(self) - class Simulation(object): """ @@ -406,27 +316,7 @@ def __init__(self,read_param: bool = False, Parameter input file equivalent: None """ - # Configuration parameters will be stored in a json file alongside the Python source scripts. - self._config_file = Path(_pyfile).parent / "swiftest_configuration.json" - config_exists = self._config_file.exists() - if config_exists: - try: - with open(self._config_file, 'r') as f: - self._swiftest_configuration = json.load(f) - except: - config_exists = False - if not config_exists: - self._swiftest_configuration = {"shell" : str(Path(os.environ['SHELL']).name), - "shell_full" : str(Path(os.environ['SHELL'])), - "getter_column_width" : '32'} - self._swiftest_configuration['startup_script'] = str(Path.home() / f".{str(self._swiftest_configuration['shell'])}rc") - config_json = json.dumps(self._swiftest_configuration, indent=4) - with open(self._config_file, 'w') as f: - f.write(config_json) - - self._getter_column_width = self._swiftest_configuration['getter_column_width'] - self._shell = Path(self._swiftest_configuration['shell']) - self._shell_full = Path(self._swiftest_configuration['shell_full']) + self._getter_column_width = 32 self.verbose = kwargs.pop("verbose",True) self.param = {} @@ -499,73 +389,9 @@ def _run_swiftest_driver(self): Internal callable function that executes the swiftest_driver run """ - def _type_scrub(output_data): - int_vars = ["ILOOP","NPL","NTP","NPLM"] - for k,v in output_data.items(): - if k in int_vars: - output_data[k] = int(v) - else: - output_data[k] = float(v) - return output_data - - process_output = False - noutput = int((self.param['TSTOP'] - self.param['T0']) / self.param['DT']) - iloop = int((self.param['TSTART'] - self.param['T0']) / self.param['DT']) - twidth = int(np.ceil(np.log10(self.param['TSTOP']/(self.param['DT'] * self.param['ISTEP_OUT'])))) - if twidth < 1: - twidth = 1 - pre_message = f"Time: {self.param['TSTART']:.{twidth}e} / {self.param['TSTOP']:.{twidth}e} {self.TU_name} " - post_message = f"npl: {self.init_cond['npl'].values[0]} ntp: {self.init_cond['ntp'].values[0]}" - if "nplm" in self.init_cond: - post_message += f" nplm: {self.init_cond['nplm'].values[0]}" - if self.param['ENERGY']: - post_message += f" dL/L0: {0.0:.5e} dE/|E0|: {0.0:+.5e}" - post_message += f" Wall time / step: {0.0:.5e} s" - pbar = tqdm(total=noutput, desc=pre_message, postfix=post_message, bar_format='{l_bar}{bar}{postfix}') with _cwd(self.simdir): - out = OutputGrabber() - with out: - driver(self.integrator,str(self.param_file), "compact") - for line in out.capturedtext: - if "SWIFTEST STOP" in line: - process_output = False - - if process_output: - kvstream=line.replace('\n','').strip().split(';') # Removes the newline character, - output_data = _type_scrub({kv.split()[0]: kv.split()[1] for kv in kvstream[:-1]}) - pre_message = f"Time: {output_data['T']:.{twidth}e} / {self.param['TSTOP']:.{twidth}e} {self.TU_name}" - post_message = f" npl: {output_data['NPL']} ntp: {output_data['NTP']}" - if "NPLM" in output_data: - post_message += f" nplm: {output_data['NPLM']}" - if "LTOTERR" in output_data: - post_message += f" dL/L0: {output_data['LTOTERR']:.5e}" - if "ETOTERR" in output_data: - post_message += f" dE/|E0|: {output_data['ETOTERR']:+.5e}" - post_message += f" Wall time / step: {output_data['WTPS']:.5e} s" - interval = output_data['ILOOP'] - iloop - if interval > 0: - pbar.update(interval) - pbar.set_description_str(pre_message) - pbar.set_postfix_str(post_message) - iloop = output_data['ILOOP'] - - if "SWIFTEST START" in line: - process_output = True - - # res = p.communicate() - # if p.returncode != 0: - # for line in res[1]: - # print(line, end='') - # warnings.warn("Failure in swiftest_driver", stacklevel=2) - # sys.exit() - # except: - # warnings.warn(f"Error executing main swiftest_driver program", stacklevel=2) - # res = p.communicate() - # for line in res[1]: - # print(line, end='') - # sys.exit() - - pbar.close() + driver(self.integrator,str(self.param_file), "progress") + return def run(self,dask: bool = False, **kwargs):