Skip to content
This repository was archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
Removed all of the output stream parsing and now just output the Fort…
Browse files Browse the repository at this point in the history
…ran progress bar version of the runs when called inside Python.
  • Loading branch information
MintoDA1 authored and MintoDA1 committed Aug 15, 2023
1 parent 1d5161a commit c6bdcd8
Showing 1 changed file with 5 additions and 179 deletions.
184 changes: 5 additions & 179 deletions swiftest/simulation_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,18 @@
import datetime
import xarray as xr
import numpy as np
from functools import partial
import numpy.typing as npt
import shutil
import warnings
import sys
from tqdm.auto import tqdm
import contextlib
from typing import (
Literal,
Dict,
List,
Tuple,
Any
)
import contextlib
from cython import nogil

@contextlib.contextmanager
def _cwd(newdir):
Expand All @@ -46,94 +44,6 @@ def _cwd(newdir):
yield
finally:
os.chdir(olddir)


import os
import sys
import threading
import time


class OutputGrabber(object):
"""
Class used to grab standard output or another stream.
"""
escape_char = "\b"

def __init__(self, stream=None, threaded=True):
self.origstream = stream
self.threaded = threaded
if self.origstream is None:
self.origstream = sys.stdout
self.origstreamfd = self.origstream.fileno()
self.capturedtext = ""
# Create a pipe so the stream can be captured:
self.pipe_out, self.pipe_in = os.pipe()

def __enter__(self):
self.start()
return self

def __exit__(self, type, value, traceback):
self.stop()

def start(self):
"""
Start capturing the stream data.
"""
self.capturedtext = ""
# Save a copy of the stream:
self.streamfd = os.dup(self.origstreamfd)
# Replace the original stream with our write pipe:
os.dup2(self.pipe_in, self.origstreamfd)
if self.threaded:
# Start thread that will read the stream:
self.workerThread = threading.Thread(target=self.readOutput)
self.workerThread.start()
# Make sure that the thread is running and os.read() has executed:
time.sleep(0.01)

def stop(self):
"""
Stop capturing the stream data and save the text in `capturedtext`.
"""
# Print the escape character to make the readOutput method stop:
self.origstream.write(self.escape_char)
# Flush the stream to make sure all our data goes in before
# the escape character:
self.origstream.flush()
if self.threaded:
# wait until the thread finishes so we are sure that
# we have until the last character:
self.workerThread.join()
else:
self.readOutput()
# Close the pipe:
os.close(self.pipe_in)
os.close(self.pipe_out)
# Restore the original stream:
os.dup2(self.streamfd, self.origstreamfd)
# Close the duplicate stream:
os.close(self.streamfd)

def readOutput(self):
"""
Read the stream data (one byte at a time)
and save the text in `capturedtext`.
"""

def _outstream(self):
capturedtext = ''
while True:
char = os.read(self.pipe_out,1).decode(self.origstream.encoding)
if not char or self.newline in char:
break
capturedtext += char
yield capturedtext

while True:
self.capturedtext = _outstream(self)


class Simulation(object):
"""
Expand Down Expand Up @@ -406,27 +316,7 @@ def __init__(self,read_param: bool = False,
Parameter input file equivalent: None
"""

# Configuration parameters will be stored in a json file alongside the Python source scripts.
self._config_file = Path(_pyfile).parent / "swiftest_configuration.json"
config_exists = self._config_file.exists()
if config_exists:
try:
with open(self._config_file, 'r') as f:
self._swiftest_configuration = json.load(f)
except:
config_exists = False
if not config_exists:
self._swiftest_configuration = {"shell" : str(Path(os.environ['SHELL']).name),
"shell_full" : str(Path(os.environ['SHELL'])),
"getter_column_width" : '32'}
self._swiftest_configuration['startup_script'] = str(Path.home() / f".{str(self._swiftest_configuration['shell'])}rc")
config_json = json.dumps(self._swiftest_configuration, indent=4)
with open(self._config_file, 'w') as f:
f.write(config_json)

self._getter_column_width = self._swiftest_configuration['getter_column_width']
self._shell = Path(self._swiftest_configuration['shell'])
self._shell_full = Path(self._swiftest_configuration['shell_full'])
self._getter_column_width = 32
self.verbose = kwargs.pop("verbose",True)

self.param = {}
Expand Down Expand Up @@ -499,73 +389,9 @@ def _run_swiftest_driver(self):
Internal callable function that executes the swiftest_driver run
"""

def _type_scrub(output_data):
int_vars = ["ILOOP","NPL","NTP","NPLM"]
for k,v in output_data.items():
if k in int_vars:
output_data[k] = int(v)
else:
output_data[k] = float(v)
return output_data

process_output = False
noutput = int((self.param['TSTOP'] - self.param['T0']) / self.param['DT'])
iloop = int((self.param['TSTART'] - self.param['T0']) / self.param['DT'])
twidth = int(np.ceil(np.log10(self.param['TSTOP']/(self.param['DT'] * self.param['ISTEP_OUT']))))
if twidth < 1:
twidth = 1
pre_message = f"Time: {self.param['TSTART']:.{twidth}e} / {self.param['TSTOP']:.{twidth}e} {self.TU_name} "
post_message = f"npl: {self.init_cond['npl'].values[0]} ntp: {self.init_cond['ntp'].values[0]}"
if "nplm" in self.init_cond:
post_message += f" nplm: {self.init_cond['nplm'].values[0]}"
if self.param['ENERGY']:
post_message += f" dL/L0: {0.0:.5e} dE/|E0|: {0.0:+.5e}"
post_message += f" Wall time / step: {0.0:.5e} s"
pbar = tqdm(total=noutput, desc=pre_message, postfix=post_message, bar_format='{l_bar}{bar}{postfix}')
with _cwd(self.simdir):
out = OutputGrabber()
with out:
driver(self.integrator,str(self.param_file), "compact")
for line in out.capturedtext:
if "SWIFTEST STOP" in line:
process_output = False

if process_output:
kvstream=line.replace('\n','').strip().split(';') # Removes the newline character,
output_data = _type_scrub({kv.split()[0]: kv.split()[1] for kv in kvstream[:-1]})
pre_message = f"Time: {output_data['T']:.{twidth}e} / {self.param['TSTOP']:.{twidth}e} {self.TU_name}"
post_message = f" npl: {output_data['NPL']} ntp: {output_data['NTP']}"
if "NPLM" in output_data:
post_message += f" nplm: {output_data['NPLM']}"
if "LTOTERR" in output_data:
post_message += f" dL/L0: {output_data['LTOTERR']:.5e}"
if "ETOTERR" in output_data:
post_message += f" dE/|E0|: {output_data['ETOTERR']:+.5e}"
post_message += f" Wall time / step: {output_data['WTPS']:.5e} s"
interval = output_data['ILOOP'] - iloop
if interval > 0:
pbar.update(interval)
pbar.set_description_str(pre_message)
pbar.set_postfix_str(post_message)
iloop = output_data['ILOOP']

if "SWIFTEST START" in line:
process_output = True

# res = p.communicate()
# if p.returncode != 0:
# for line in res[1]:
# print(line, end='')
# warnings.warn("Failure in swiftest_driver", stacklevel=2)
# sys.exit()
# except:
# warnings.warn(f"Error executing main swiftest_driver program", stacklevel=2)
# res = p.communicate()
# for line in res[1]:
# print(line, end='')
# sys.exit()

pbar.close()
driver(self.integrator,str(self.param_file), "progress")

return

def run(self,dask: bool = False, **kwargs):
Expand Down

0 comments on commit c6bdcd8

Please sign in to comment.