From 1d5161acd8bc595302c9fedbf6a3b26121c35fcb Mon Sep 17 00:00:00 2001 From: MintoDA1 <51412913+MintoDA1@users.noreply.github.com> Date: Tue, 15 Aug 2023 17:16:38 -0400 Subject: [PATCH] Restructured the driver method so that running the simulation acts sort of like it used to --- src/main/main.f90 | 9 ++ src/swiftest/swiftest_driver.f90 | 9 -- swiftest/_bindings.pyx | 5 +- swiftest/simulation_class.py | 143 +++++++++++++++++++++++++------ 4 files changed, 128 insertions(+), 38 deletions(-) diff --git a/src/main/main.f90 b/src/main/main.f90 index 1b8572c3f..9f27c7d27 100644 --- a/src/main/main.f90 +++ b/src/main/main.f90 @@ -25,4 +25,13 @@ program main ! Execute the driver call swiftest_driver(integrator, param_file_name, display_style) + +#ifdef COARRAY + if (this_image() == 1) then +#endif + call base_util_exit(SUCCESS) +#ifdef COARRAY + end if ! (this_image() == 1) +#endif + end program main \ No newline at end of file diff --git a/src/swiftest/swiftest_driver.f90 b/src/swiftest/swiftest_driver.f90 index 0635e5830..870bea9d2 100644 --- a/src/swiftest/swiftest_driver.f90 +++ b/src/swiftest/swiftest_driver.f90 @@ -182,15 +182,6 @@ module subroutine swiftest_driver(integrator, param_file_name, display_style) call nbody_system%dump(param, system_history) call nbody_system%display_run_information(param, integration_timer, phase="last") end associate - -#ifdef COARRAY - if (this_image() == 1) then -#endif - call base_util_exit(SUCCESS,unit=param%display_unit) -#ifdef COARRAY - end if ! (this_image() == 1) -#endif - return end subroutine swiftest_driver diff --git a/swiftest/_bindings.pyx b/swiftest/_bindings.pyx index 936da9814..8f8265c04 100644 --- a/swiftest/_bindings.pyx +++ b/swiftest/_bindings.pyx @@ -1,7 +1,7 @@ # cython: language_level=3, c_string_type=unicode, c_string_encoding=ascii cdef extern from "_bindings.h": - void bindings_c_driver(char* integrator, char* param_file_name, char* display_style) + void bindings_c_driver(char* integrator, char* param_file_name, char* display_style) noexcept nogil def driver(integrator, param_file_name, display_style): b_integrator = bytes(integrator,'ascii') + b'\x00' @@ -13,5 +13,6 @@ def driver(integrator, param_file_name, display_style): char* c_param_file_name = b_param_file_name char* c_display_style = b_display_style - bindings_c_driver(c_integrator, c_param_file_name, c_display_style) + with nogil: + bindings_c_driver(c_integrator, c_param_file_name, c_display_style) return \ No newline at end of file diff --git a/swiftest/simulation_class.py b/swiftest/simulation_class.py index eba6cc3b4..df2c95bab 100644 --- a/swiftest/simulation_class.py +++ b/swiftest/simulation_class.py @@ -28,7 +28,6 @@ import shutil import warnings import sys -from py.io import StdCapture from tqdm.auto import tqdm from typing import ( Literal, @@ -47,6 +46,94 @@ def _cwd(newdir): yield finally: os.chdir(olddir) + + +import os +import sys +import threading +import time + + +class OutputGrabber(object): + """ + Class used to grab standard output or another stream. + """ + escape_char = "\b" + + def __init__(self, stream=None, threaded=True): + self.origstream = stream + self.threaded = threaded + if self.origstream is None: + self.origstream = sys.stdout + self.origstreamfd = self.origstream.fileno() + self.capturedtext = "" + # Create a pipe so the stream can be captured: + self.pipe_out, self.pipe_in = os.pipe() + + def __enter__(self): + self.start() + return self + + def __exit__(self, type, value, traceback): + self.stop() + + def start(self): + """ + Start capturing the stream data. + """ + self.capturedtext = "" + # Save a copy of the stream: + self.streamfd = os.dup(self.origstreamfd) + # Replace the original stream with our write pipe: + os.dup2(self.pipe_in, self.origstreamfd) + if self.threaded: + # Start thread that will read the stream: + self.workerThread = threading.Thread(target=self.readOutput) + self.workerThread.start() + # Make sure that the thread is running and os.read() has executed: + time.sleep(0.01) + + def stop(self): + """ + Stop capturing the stream data and save the text in `capturedtext`. + """ + # Print the escape character to make the readOutput method stop: + self.origstream.write(self.escape_char) + # Flush the stream to make sure all our data goes in before + # the escape character: + self.origstream.flush() + if self.threaded: + # wait until the thread finishes so we are sure that + # we have until the last character: + self.workerThread.join() + else: + self.readOutput() + # Close the pipe: + os.close(self.pipe_in) + os.close(self.pipe_out) + # Restore the original stream: + os.dup2(self.streamfd, self.origstreamfd) + # Close the duplicate stream: + os.close(self.streamfd) + + def readOutput(self): + """ + Read the stream data (one byte at a time) + and save the text in `capturedtext`. + """ + + def _outstream(self): + capturedtext = '' + while True: + char = os.read(self.pipe_out,1).decode(self.origstream.encoding) + if not char or self.newline in char: + break + capturedtext += char + yield capturedtext + + while True: + self.capturedtext = _outstream(self) + class Simulation(object): """ @@ -436,32 +523,34 @@ def _type_scrub(output_data): post_message += f" Wall time / step: {0.0:.5e} s" pbar = tqdm(total=noutput, desc=pre_message, postfix=post_message, bar_format='{l_bar}{bar}{postfix}') with _cwd(self.simdir): - res, out, err = StdCapture.call(driver(self.integrator,str(self.param_file), "compact")) - for line in out: - if "SWIFTEST STOP" in line: - process_output = False - - if process_output: - kvstream=line.replace('\n','').strip().split(';') # Removes the newline character, - output_data = _type_scrub({kv.split()[0]: kv.split()[1] for kv in kvstream[:-1]}) - pre_message = f"Time: {output_data['T']:.{twidth}e} / {self.param['TSTOP']:.{twidth}e} {self.TU_name}" - post_message = f" npl: {output_data['NPL']} ntp: {output_data['NTP']}" - if "NPLM" in output_data: - post_message += f" nplm: {output_data['NPLM']}" - if "LTOTERR" in output_data: - post_message += f" dL/L0: {output_data['LTOTERR']:.5e}" - if "ETOTERR" in output_data: - post_message += f" dE/|E0|: {output_data['ETOTERR']:+.5e}" - post_message += f" Wall time / step: {output_data['WTPS']:.5e} s" - interval = output_data['ILOOP'] - iloop - if interval > 0: - pbar.update(interval) - pbar.set_description_str(pre_message) - pbar.set_postfix_str(post_message) - iloop = output_data['ILOOP'] - - if "SWIFTEST START" in line: - process_output = True + out = OutputGrabber() + with out: + driver(self.integrator,str(self.param_file), "compact") + for line in out.capturedtext: + if "SWIFTEST STOP" in line: + process_output = False + + if process_output: + kvstream=line.replace('\n','').strip().split(';') # Removes the newline character, + output_data = _type_scrub({kv.split()[0]: kv.split()[1] for kv in kvstream[:-1]}) + pre_message = f"Time: {output_data['T']:.{twidth}e} / {self.param['TSTOP']:.{twidth}e} {self.TU_name}" + post_message = f" npl: {output_data['NPL']} ntp: {output_data['NTP']}" + if "NPLM" in output_data: + post_message += f" nplm: {output_data['NPLM']}" + if "LTOTERR" in output_data: + post_message += f" dL/L0: {output_data['LTOTERR']:.5e}" + if "ETOTERR" in output_data: + post_message += f" dE/|E0|: {output_data['ETOTERR']:+.5e}" + post_message += f" Wall time / step: {output_data['WTPS']:.5e} s" + interval = output_data['ILOOP'] - iloop + if interval > 0: + pbar.update(interval) + pbar.set_description_str(pre_message) + pbar.set_postfix_str(post_message) + iloop = output_data['ILOOP'] + + if "SWIFTEST START" in line: + process_output = True # res = p.communicate() # if p.returncode != 0: