diff --git a/README.md b/README.md index b605937..0be272f 100644 --- a/README.md +++ b/README.md @@ -19,26 +19,27 @@ The image below is a link to a YouTube video showing some of its design and feat ## Table of Contents -- [Hardware](#hardware) -- [Examples](#example-images) -- [3D printer models](#3d-printer-models) -- [Automated data analysis](#automated-data-analysis) -- [Installation](#installation) - - [Enabling the Wi-Fi hotspot](#enabling-the-wi-fi-hotspot) -- [Usage](#usage) - - [Working with SPIRO](#working-with-spiro) - - [Connecting to the web interface](#connecting-to-the-web-interface) - - [Setting up imaging](#setting-up-imaging) - - [Starting an experiment](#starting-an-experiment) - - [Downloading images](#downloading-images) -- [Maintaining the system](#maintaining-the-system) - - [Restarting the software](#restarting-the-software) - - [Shutting down the system](#shutting-down-the-system) - - [Keeping software up to date](#keeping-software-up-to-date) -- [Troubleshooting](#troubleshooting) - - [Viewing the software log](#viewing-the-software-log) - - [Testing the LED and motor](#testing-the-led-and-motor) -- [Licensing](#licensing) +* [Hardware](#hardware) +* [Examples](#example-images) +* [3D printer models](#3d-printer-models) +* [Automated data analysis](#automated-data-analysis) +* [Installation](#installation) + * [Enabling the Wi-Fi hotspot](#enabling-the-wi-fi-hotspot) +* [Usage](#usage) + * [Working with SPIRO](#working-with-spiro) + * [Connecting to the web interface](#connecting-to-the-web-interface) + * [Setting up imaging](#setting-up-imaging) + * [Starting an experiment](#starting-an-experiment) + * [Globus transfer during experiments](#globus-transfer-during-experiments) + * [Downloading images](#downloading-images) +* [Maintaining the system](#maintaining-the-system) + * [Restarting the software](#restarting-the-software) + * [Shutting down the system](#shutting-down-the-system) + * [Keeping software up to date](#keeping-software-up-to-date) +* [Troubleshooting](#troubleshooting) + * [Viewing the software log](#viewing-the-software-log) + * [Testing the LED and motor](#testing-the-led-and-motor) +* [Licensing](#licensing) ## Hardware @@ -117,7 +118,6 @@ sudo raspi-config ``` In the raspi-config interface, make the following changes: -<<<<<<< Updated upstream * **Change the password**. The system will allow network access, and a weak password **will** compromise your network security **and** your experimental data. * After changing the password, connect the network cable (if you are using wired networking). * Under *Interfacing*, enable *I2C* and *SSH*. The camera is used through Bookworm's libcamera stack, so the old Legacy camera stack is not required. @@ -126,17 +126,6 @@ In the raspi-config interface, make the following changes: * If needed, configure *Network* and *Localization* options here as well. Set a *Hostname* under Network if you plan on running several SPIROs. * Finally, select *Finish*, and choose to reboot the system when asked. * After reboot, the system shows a message on the screen showing its IP address ("My IP address is: *a.b.c.d*"). Make a note of this address as you will need it to access the system over the network. Make sure that your network allows access to ports 8080 on this IP address. (Alternatively, see [Enabling the Wi-Fi hotspot](#enabling-the-wifi-hotspot)) -======= - -- **Change the password**. The system will allow network access, and a weak password **will** compromise your network security **and** your experimental data. -- After changing the password, connect the network cable (if you are using wired networking). -- Under _Interfacing_, enable _Camera_, _I2C_, and _SSH_. -- In _Performance Options_, set _GPU Memory_ to 256. -- Under _Localisation Options_, make sure to set the _Timezone_. Please note that a working network connection is required to maintain the correct date. -- If needed, configure _Network_ and _Localization_ options here as well. Set a _Hostname_ under Network if you plan on running several SPIROs. -- Finally, select _Finish_, and choose to reboot the system when asked. -- After reboot, the system shows a message on the screen showing its IP address ("My IP address is: _a.b.c.d_"). Make a note of this address as you will need it to access the system over the network. Make sure that your network allows access to ports 8080 on this IP address. (Alternatively, see [Enabling the Wi-Fi hotspot](#enabling-the-wifi-hotspot)) ->>>>>>> Stashed changes Next, ensure the system is connected to the internet. Update the operating system and install the required tools: @@ -300,18 +289,44 @@ After logging in to the system, you are presented with the _Live view_. Here, yo Under _Day image settings_ and _Night image settings_, you can adjust the exposure time for day and night images in real time. For night images, make sure that representative conditions are used (i.e., turn off the lights in the growth chamber). When the image is captured according to your liking, choose _Update and save_. -<<<<<<< Updated upstream For locating the imaging positions, SPIRO expects **two sensor events per position**. The first trigger is treated as the early-warning stop, telling the software that the final stop is approaching. The system then slows down, finds the second stop, and only after that applies the configured final offset (*calibration value*). That same final offset is used for all positions, not only the start position. Use the *Calibrate motor* view to tune the primary step delay, secondary step delay, secondary stop gap, stable sensor reads, and final offset. The page now also includes manual jog controls, live step counters, and repeatability tests so you can dial in settings without saving every trial. -======= -For locating the initial imaging position, the system turns the cube until a positional switch is activated. It then turns the cube a predefined amount of steps (_calibration value_). The check that the calibration value is correct, go to the _Start position calibration_ view. Here, you may try out the current value, as well as change it. Make sure to click _Save value_ if you change it. ->>>>>>> Stashed changes ### Starting an experiment When imaging parameters are set up to your liking, you are ready to start your experiments. In the _Experiment control_ view, choose a name for your experiment, as well as the duration and imaging frequency. After you choose _Start experiment_, the system will disable most of the functionality of the web interface, displaying a simple status window containing experiment parameters, as well as the last image captured. +### Globus transfer during experiments + +SPIRO can transfer newly captured files to Globus while an experiment runs. + +On a Raspberry Pi running Raspberry Pi OS, install the local source endpoint software first: + +```bash +sudo apt update +sudo apt install -y wget python3-pip tcl tk +python3 -m pip install --user globus-cli +wget https://downloads.globus.org/globus-connect-personal/linux/stable/globusconnectpersonal-latest.tgz +tar xzf globusconnectpersonal-latest.tgz +mv globusconnectpersonal-* ~/.globusconnectpersonal +``` + +After installation: + +1. Open *System settings* and set your *Globus Client ID*. +2. In *Local source endpoint*, use *Launch setup/login* to start Globus Connect Personal setup on the SPIRO device. +3. If the Pi is headless, complete the first-time setup from a local shell by running `~/.globusconnectpersonal/globusconnectpersonal` or `~/.globusconnectpersonal/globusconnectpersonal -setup`. +4. Use *Start local client* once Globus Connect Personal is configured. +5. Use *Refresh local endpoint ID* to detect the SPIRO device's source endpoint via `globus endpoint local-id`. +6. Click *Connect Globus* to authorize the SPIRO web app to submit transfer tasks. +7. Copy the authorization code from Globus and paste it into *Complete Globus Login*. +8. In *Experiment control*, enable transfer and provide only the destination endpoint ID and destination base path for this run. + +SPIRO now uses the local Globus Connect Personal endpoint on the device as the source endpoint automatically. The source path is set to the experiment directory for the active run. + +Transfers are queued once per imaging cycle and submitted in the background. If a transfer fails, SPIRO keeps imaging and retries automatically. + ### Downloading images Images can be downloaded from the web interface under _File manager_. The File manager also allows deleting files to free up space on the SD card. @@ -439,20 +454,12 @@ PY If the motor does not move correctly: -<<<<<<< Updated upstream * Confirm the motor is connected to **M2** on the HAT. * Confirm the HAT's external motor power supply is connected and switched on. * Confirm the M2 DIP switches `D3-D5` are still at the factory default unless you intentionally recalibrated for another microstepping mode. * Confirm the motor wiring matches the HAT's `A3/A4/B3/B4` outputs. * Confirm the DRV8825 current limit is set appropriately for the motor. * If the stage moves in the wrong direction, set `motor_direction_active_high` in the SPIRO config instead of rewiring the whole system. -======= -If it doesn't respond to this command, this may indicate either miswiring, or that either the LED strip or the MOSFET is non-functional. - -Similarly, you can turn on and off the motor, by substituting the value _23_ for 17 in the above examples. When GPIO pin 23 is toggled on, the cube should be locked in position. If it is not, check that your wiring looks good, that the power supply is connected, and that the shaft coupler is firmly attached to both the cube and the motor. - -If the motor is moving jerkily during normal operation, there is likely a problem with the wiring of the coil pins (Ain1&2 and Bin1&2). ->>>>>>> Stashed changes ## Licensing diff --git a/setup.py b/setup.py index 3accc4c..0913509 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ def get_version_and_cmdclass(package_path): cmdclass = cmdclass, packages = find_packages(), scripts = ['bin/spiro'], - install_requires = ['picamera2==0.3.31', 'gpiozero>=2.0.1,<3', 'Flask==2.2.5', 'waitress==2.1.2', 'numpy==1.24.2', 'Werkzeug==2.2.3'], + install_requires = ['picamera2==0.3.31', 'RPi.GPIO==0.7.1', 'Flask==2.2.5', 'waitress==2.1.2', 'numpy==1.24.2', 'Werkzeug==2.2.3', 'globus-sdk>=3.39.0', 'cryptography>=42.0.0'], author = 'Kirby Kalbaugh', author_email = 'kkalbaug@purdue.edu', description = 'Control software for the SPIRO biological imaging system', diff --git a/spiro/config.py b/spiro/config.py index 88658bf..a1bbafb 100644 --- a/spiro/config.py +++ b/spiro/config.py @@ -7,6 +7,7 @@ import json import os import sys +from cryptography.fernet import Fernet from importlib import metadata from ._version import __version__ @@ -89,7 +90,13 @@ class Config(object): 'name': 'spiro', # the name of this spiro instance 'timezone': 'America/New_York', # user display/save timezone; device clock remains UTC 'debug': True, # debug logging - 'rotated_camera': True # rotated camera house + 'rotated_camera': True, # rotated camera house + 'globus_client_id': '', + 'globus_token_encrypted': '', + 'globus_gcp_install_dir': '~/.globusconnectpersonal', + 'globus_local_endpoint_id': '', + 'globus_source_endpoint': '', + 'globus_source_base_path': '/' } config = {} @@ -98,6 +105,7 @@ def __init__(self): """Initialize file locations, runtime version, and cached config state.""" self.cfgdir = os.path.expanduser("~/.config/spiro") self.cfgfile = os.path.join(self.cfgdir, "spiro.conf") + self.keyfile = os.path.join(self.cfgdir, "token.key") self.version = _resolve_version() self.read() if os.path.exists(self.cfgfile): @@ -124,9 +132,44 @@ def write(self): with open(self.cfgfile + ".tmp", 'w') as f: json.dump(self.config, f, indent=4) os.replace(self.cfgfile + ".tmp", self.cfgfile) + os.chmod(self.cfgfile, 0o600) except OSError as e: log("Failed to write config file: " + e.strerror) + + def _get_or_create_secret_key(self): + os.makedirs(self.cfgdir, exist_ok=True) + if os.path.exists(self.keyfile): + with open(self.keyfile, 'rb') as f: + key = f.read().strip() + if key: + return key + + key = Fernet.generate_key() + with open(self.keyfile + '.tmp', 'wb') as f: + f.write(key) + os.replace(self.keyfile + '.tmp', self.keyfile) + os.chmod(self.keyfile, 0o600) + return key + + + def encrypt_secret(self, value): + if value in [None, '']: + return '' + cipher = Fernet(self._get_or_create_secret_key()) + return cipher.encrypt(value.encode('utf-8')).decode('utf-8') + + + def decrypt_secret(self, value): + if value in [None, '']: + return '' + try: + cipher = Fernet(self._get_or_create_secret_key()) + return cipher.decrypt(value.encode('utf-8')).decode('utf-8') + except Exception as e: + log('Failed to decrypt secret: ' + str(e)) + return '' + def get(self, key): """Return a config value, reloading from disk if another process updated it.""" if os.path.exists(self.cfgfile): diff --git a/spiro/experimenter.py b/spiro/experimenter.py index 5887990..673ef8a 100644 --- a/spiro/experimenter.py +++ b/spiro/experimenter.py @@ -42,6 +42,7 @@ def __init__(self, hw=None, cam=None): self.preview_lock = threading.Lock() self.nshots = 0 self.idlepos = 0 + self.transfer_cycle_handler = None threading.Thread.__init__(self) @@ -357,6 +358,7 @@ def runExperiment(self): nextloop = time.time() + 60 * self.delay if nextloop > self.endtime: nextloop = self.endtime + cycle_files = [] self.hw.motorOn(True) for i in range(self.position_count): @@ -383,11 +385,18 @@ def runExperiment(self): now = filename_timestamp(self.cfg.get('timezone')) name = os.path.join("plate" + str(i + 1), "plate" + str(i + 1) + "-" + now) self.takePicture(name, i) + cycle_files.append(self.last_captured[i]) if self.stop_experiment: self.hw.motorOn(False) break + if self.transfer_cycle_handler and cycle_files: + try: + self.transfer_cycle_handler(cycle_files) + except Exception as e: + debug('Failed to queue transfer cycle files: ' + str(e)) + self.nshots -= 1 self.hw.motorOn(False) if self.status != "Stopping": self.status = "Waiting" diff --git a/spiro/globus_local.py b/spiro/globus_local.py new file mode 100644 index 0000000..abb8fe8 --- /dev/null +++ b/spiro/globus_local.py @@ -0,0 +1,136 @@ +import os +import re +import shutil +import subprocess + +from spiro.logger import debug + + +class LocalGlobusConnectPersonal(object): + def __init__(self, cfg): + self.cfg = cfg + + + def install_dir(self): + return os.path.expanduser(self.cfg.get('globus_gcp_install_dir') or '~/.globusconnectpersonal') + + + def binary_path(self): + return os.path.join(self.install_dir(), 'globusconnectpersonal') + + + def is_installed(self): + return os.path.isfile(self.binary_path()) + + + def is_configured(self): + return os.path.isdir(os.path.expanduser('~/.globusonline')) + + + def cli_path(self): + return shutil.which('globus') + + + def launch_setup_or_login(self): + if not self.is_installed(): + raise RuntimeError('Globus Connect Personal is not installed. Follow the README setup instructions first.') + + subprocess.Popen( + [self.binary_path()], + cwd=self.install_dir(), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, + ) + + + def start_background(self): + if not self.is_installed(): + raise RuntimeError('Globus Connect Personal is not installed. Follow the README setup instructions first.') + + result = subprocess.run( + [self.binary_path(), '-start'], + cwd=self.install_dir(), + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + details = (result.stderr or result.stdout or 'Unable to start Globus Connect Personal.').strip() + raise RuntimeError(details) + return result.stdout.strip() + + + def status(self, refresh_endpoint=True): + info = { + 'install_dir': self.install_dir(), + 'binary_path': self.binary_path(), + 'installed': self.is_installed(), + 'configured': self.is_configured(), + 'running': False, + 'connected': False, + 'status_text': 'Not installed', + 'endpoint_id': self.cfg.get('globus_local_endpoint_id') or '', + 'cli_available': bool(self.cli_path()), + } + + if not info['installed']: + return info + + try: + result = subprocess.run( + [self.binary_path(), '-status'], + cwd=self.install_dir(), + capture_output=True, + text=True, + timeout=15, + ) + output = (result.stdout or result.stderr or '').strip() + if result.returncode == 0: + info['running'] = True + info['status_text'] = output or 'Running' + info['connected'] = 'globus online: connected' in output.lower() + elif info['configured']: + info['status_text'] = output or 'Installed but not running' + else: + info['status_text'] = 'Installed but not configured' + except Exception as e: + debug('Failed to read Globus Connect Personal status: ' + str(e)) + if info['configured']: + info['status_text'] = 'Installed but status is unavailable' + + if refresh_endpoint: + endpoint_id = self.refresh_local_endpoint_id() + if endpoint_id: + info['endpoint_id'] = endpoint_id + + return info + + + def refresh_local_endpoint_id(self): + cli = self.cli_path() + if not cli: + return self.cfg.get('globus_local_endpoint_id') or '' + + try: + result = subprocess.run( + [cli, 'endpoint', 'local-id'], + capture_output=True, + text=True, + timeout=15, + ) + except Exception as e: + debug('Failed to query local Globus endpoint id: ' + str(e)) + return self.cfg.get('globus_local_endpoint_id') or '' + + if result.returncode != 0: + return self.cfg.get('globus_local_endpoint_id') or '' + + endpoint_id = (result.stdout or '').strip().splitlines() + endpoint_id = endpoint_id[-1].strip() if endpoint_id else '' + if re.fullmatch(r'[0-9a-fA-F-]{36}', endpoint_id): + self.cfg.set('globus_local_endpoint_id', endpoint_id) + self.cfg.set('globus_source_endpoint', endpoint_id) + return endpoint_id + + return self.cfg.get('globus_local_endpoint_id') or '' \ No newline at end of file diff --git a/spiro/templates/experiment.html b/spiro/templates/experiment.html index 0eae033..3dea97e 100644 --- a/spiro/templates/experiment.html +++ b/spiro/templates/experiment.html @@ -49,7 +49,11 @@

Experiment info

Ends: {{ endtime }}
Images remaining/plate: {{ nshots }}
Disk required: {{ diskreq }} GB
-Disk available: {{ diskspace }} GB

+Disk available: {{ diskspace }} GB
+Transfer status: {{ transfer_status.status }}
+Transfer queue: {{ transfer_status.queued }} batch(es)
+Submitted files: {{ transfer_status.submitted }}
+Failed files: {{ transfer_status.failed }}

@@ -87,6 +91,28 @@

Experiment info

{{ diskspace }} GB
+Globus transfer (optional) +
+ + +{% if globus_connected and local_globus_ready %}Queue cycle-end batches during run.{% elif not globus_connected %}Connect the SPIRO app to Globus in System settings first.{% else %}Install/login the local Globus Connect Personal client first.{% endif %} +
+
+ +{% if local_globus_status.endpoint_id %}{{ local_globus_status.endpoint_id }}{% else %}Not detected yet{% endif %} +
+
+ +Automatically set to the experiment directory when the run starts. +
+
+ + +
+
+ + +
diff --git a/spiro/templates/settings.html b/spiro/templates/settings.html index 880cfe4..a5eece0 100644 --- a/spiro/templates/settings.html +++ b/spiro/templates/settings.html @@ -42,15 +42,126 @@ {% endfor %}
+Globus transfer +
+ + +
+
+ + +
Change password +{% if globus_connected %} +Disconnect Globus +{% else %} +Connect Globus +{% endif %}
Used for saved filenames and displayed dates. Device stays on UTC. Default is EST via America/New_York.
+
+Connect Globus authorizes SPIRO to submit transfer tasks. The source endpoint is managed locally by Globus Connect Personal on this device. +
+
+
+
+ + +
+
+ +
+
+
+
+
+Globus status: +
+
+{% if globus_connected %}Connected{% else %}Not connected{% endif %} +
+
+
+
+Transfer worker: +
+
+{{ transfer_status.status }} +
+
+Local source endpoint +
+Launch setup/login +Start local client +Refresh local endpoint ID +
+
+SPIRO will auto-detect this device's source endpoint and use the experiment directory as the source path. +
+
+
+Client install: +
+
+{% if local_globus_status.installed %}Installed{% else %}Not installed{% endif %} +
+
+
+
+Client configured: +
+
+{% if local_globus_status.configured %}Yes{% else %}No{% endif %} +
+
+
+
+Client status: +
+
+{{ local_globus_status.status_text }} +
+
+
+
+CLI available: +
+
+{% if local_globus_status.cli_available %}Yes{% else %}No{% endif %} +
+
+
+
+Local endpoint ID: +
+
+{% if local_globus_status.endpoint_id %}{{ local_globus_status.endpoint_id }}{% else %}Unavailable{% endif %} +
+
+
+
+Source path base: +
+
+{% if globus_source_base_path %}{{ globus_source_base_path }}{% else %}Will use the active experiment directory{% endif %} +
+
+{% if globus_auth_started %} +
+
+Globus login: +
+
+Authorization started. Paste the code to finish login. +
+
+{% endif %} Time
diff --git a/spiro/transfer.py b/spiro/transfer.py new file mode 100644 index 0000000..480aaa7 --- /dev/null +++ b/spiro/transfer.py @@ -0,0 +1,207 @@ +# transfer.py - +# background Globus transfer manager +# + +import os +import time +import queue +import threading +import json +import importlib + +from spiro.logger import log, debug + + +def _get_globus_sdk(): + try: + return importlib.import_module('globus_sdk') + except Exception: + return None + + +class GlobusTransferManager(object): + def __init__(self, cfg): + self.cfg = cfg + self.enabled = False + self.source_endpoint = '' + self.source_base_path = '/' + self.destination_endpoint = '' + self.destination_base_path = '/' + self.experiment_root = '' + self.queue = queue.Queue() + self.stop_event = threading.Event() + self.worker = None + self.status_lock = threading.Lock() + self.last_status = 'Idle' + self.last_error = '' + self.total_submitted = 0 + self.total_failed = 0 + + def _set_status(self, status, err=''): + with self.status_lock: + self.last_status = status + self.last_error = err + + def get_status(self): + with self.status_lock: + return { + 'enabled': self.enabled, + 'status': self.last_status, + 'error': self.last_error, + 'submitted': self.total_submitted, + 'failed': self.total_failed, + 'queued': self.queue.qsize(), + 'source_endpoint': self.source_endpoint, + 'destination_endpoint': self.destination_endpoint, + 'destination_base_path': self.destination_base_path, + } + + def _load_token_payload(self): + enc = self.cfg.get('globus_token_encrypted') + if not enc: + return None + payload_text = self.cfg.decrypt_secret(enc) + if not payload_text: + return None + try: + return json.loads(payload_text) + except Exception as e: + debug('Failed to parse token payload: ' + str(e)) + return None + + def has_credentials(self): + payload = self._load_token_payload() + return bool(payload and payload.get('refresh_token')) + + def clear_credentials(self): + self.cfg.set('globus_token_encrypted', '') + + def save_token_payload(self, payload): + serial = json.dumps(payload) + self.cfg.set('globus_token_encrypted', self.cfg.encrypt_secret(serial)) + + def _build_transfer_client(self): + globus_sdk = _get_globus_sdk() + if globus_sdk is None: + raise RuntimeError('globus-sdk is not installed on this system.') + payload = self._load_token_payload() + if not payload: + raise RuntimeError('No Globus token payload found. Authenticate first.') + + client_id = self.cfg.get('globus_client_id') + if not client_id: + raise RuntimeError('Globus Client ID missing. Configure in settings.') + + auth_client = globus_sdk.NativeAppAuthClient(client_id) + + def _on_refresh(token_response): + transfer_tokens = token_response.by_resource_server.get('transfer.api.globus.org', {}) + if transfer_tokens: + self.save_token_payload(transfer_tokens) + + authorizer = globus_sdk.RefreshTokenAuthorizer( + payload.get('refresh_token'), + auth_client, + access_token=payload.get('access_token'), + expires_at=payload.get('expires_at_seconds'), + on_refresh=_on_refresh, + ) + return globus_sdk.TransferClient(authorizer=authorizer) + + def configure_run(self, source_endpoint, source_base_path, destination_endpoint, destination_base_path, experiment_root): + self.source_endpoint = (source_endpoint or '').strip() + self.source_base_path = (source_base_path or '/').strip() or '/' + self.destination_endpoint = (destination_endpoint or '').strip() + self.destination_base_path = (destination_base_path or '/').strip() or '/' + self.experiment_root = os.path.abspath(experiment_root) + self.enabled = bool(self.source_endpoint and self.destination_endpoint and self.has_credentials()) + if not self.enabled: + self._set_status('Transfer disabled') + + def start_worker(self): + if self.worker and self.worker.is_alive(): + return + self.stop_event.clear() + self.worker = threading.Thread(target=self._worker_loop, daemon=True) + self.worker.start() + + def stop_worker(self): + self.stop_event.set() + + def queue_cycle_files(self, files): + if not self.enabled or not files: + return + self.queue.put({'files': list(files), 'attempt': 1, 'queued_at': time.time()}) + self._set_status('Queued cycle batch') + + def _normalize_rel_path(self, filepath): + rel = os.path.relpath(os.path.abspath(filepath), self.experiment_root) + return rel.replace('\\', '/') + + def _source_path(self, filepath): + rel = self._normalize_rel_path(filepath) + base = self.source_base_path.rstrip('/') + if not base: + base = '/' + if base == '/': + return '/' + rel + return base + '/' + rel + + def _dest_path(self, filepath): + rel = self._normalize_rel_path(filepath) + base = self.destination_base_path.rstrip('/') + if not base: + base = '/' + if base == '/': + return '/' + rel + return base + '/' + rel + + def _submit_batch(self, files): + globus_sdk = _get_globus_sdk() + if globus_sdk is None: + raise RuntimeError('globus-sdk is not installed on this system.') + tc = self._build_transfer_client() + tdata = globus_sdk.TransferData( + tc, + self.source_endpoint, + self.destination_endpoint, + label='SPIRO cycle transfer', + sync_level='checksum', + ) + for fp in files: + tdata.add_item(self._source_path(fp), self._dest_path(fp)) + res = tc.submit_transfer(tdata) + task_id = res.get('task_id', 'unknown') + self.total_submitted += len(files) + self._set_status('Submitted transfer task ' + task_id) + log('Submitted Globus transfer task ' + task_id + ' with ' + str(len(files)) + ' files.') + + def _worker_loop(self): + self._set_status('Transfer worker running') + while not self.stop_event.is_set(): + try: + item = self.queue.get(timeout=1) + except queue.Empty: + continue + + files = item.get('files', []) + attempt = int(item.get('attempt', 1)) + if not files: + self.queue.task_done() + continue + + try: + self._set_status('Submitting cycle batch') + self._submit_batch(files) + except Exception as e: + self.total_failed += len(files) + self._set_status('Transfer failed, retrying', str(e)) + log('Globus transfer failed (attempt ' + str(attempt) + '): ' + str(e)) + if attempt < 5 and not self.stop_event.is_set(): + backoff = min(300, 2 ** attempt) + time.sleep(backoff) + self.queue.put({'files': files, 'attempt': attempt + 1, 'queued_at': time.time()}) + finally: + self.queue.task_done() + + self._set_status('Transfer worker stopped') diff --git a/spiro/webui.py b/spiro/webui.py index 2cbba33..8ed592b 100644 --- a/spiro/webui.py +++ b/spiro/webui.py @@ -9,25 +9,35 @@ import io import os import re +import base64 import subprocess import time import shutil import signal import hashlib -from threading import Thread, Lock, Condition, Timer +from threading import Thread, Lock, Condition +try: + import globus_sdk +except Exception: + globus_sdk = None from waitress import serve from flask import Flask, render_template, Response, request, redirect, url_for, session, flash, abort, jsonify import spiro.hostapd as hostapd from spiro.config import Config +from spiro.globus_local import LocalGlobusConnectPersonal from spiro.logger import log, debug from spiro.experimenter import Experimenter +from spiro.transfer import GlobusTransferManager from spiro.timeutils import COMMON_TIMEZONES, format_timestamp, now_in_timezone, validate_timezone_name app = Flask(__name__) app.jinja_env.trim_blocks = True app.jinja_env.lstrip_blocks = True +GLOBUS_TRANSFER_SCOPE = 'urn:globus:auth:scope:transfer.api.globus.org:all' +GLOBUS_AUTH_VERIFIER_SESSION_KEY = 'globus_auth_verifier' +GLOBUS_AUTH_STATE_SESSION_KEY = 'globus_auth_state' CALIBRATION_MOTOR_HOLD_SECONDS = 5 * 60 CALIBRATION_REPEATABILITY_MAX_CYCLES = 25 @@ -447,6 +457,37 @@ def checkPass(pwd): return False +def _globus_auth_client(): + if globus_sdk is None: + raise RuntimeError('globus-sdk is not installed on this system.') + client_id = (cfg.get('globus_client_id') or '').strip() + if not client_id: + raise RuntimeError('Set Globus Client ID in System settings before connecting.') + return globus_sdk.NativeAppAuthClient(client_id) + + +def _get_transfer_manager(): + global transfer_manager + if transfer_manager is None: + transfer_manager = GlobusTransferManager(cfg) + return transfer_manager + + +def _get_local_globus(): + global local_globus + if local_globus is None: + local_globus = LocalGlobusConnectPersonal(cfg) + return local_globus + + +def _globus_connected(): + return _get_transfer_manager().has_credentials() + + +def _new_globus_auth_verifier(): + return base64.urlsafe_b64encode(os.urandom(32)).decode('ascii').rstrip('=') + + def get_plate_dirs(check_dir): plate_dirs = [] for entry in os.scandir(check_dir): @@ -512,6 +553,104 @@ def logout(): return redirect(url_for('login')) +@app.route('/globus/auth/start') +def globus_auth_start(): + try: + auth_client = _globus_auth_client() + verifier = _new_globus_auth_verifier() + state = base64.urlsafe_b64encode(os.urandom(12)).decode('ascii').rstrip('=') + auth_client.oauth2_start_flow( + refresh_tokens=True, + requested_scopes=['openid', 'profile', GLOBUS_TRANSFER_SCOPE], + redirect_uri='https://auth.globus.org/v2/web/auth-code', + verifier=verifier, + state=state, + ) + session['globus_auth_started'] = True + session[GLOBUS_AUTH_VERIFIER_SESSION_KEY] = verifier + session[GLOBUS_AUTH_STATE_SESSION_KEY] = state + auth_url = auth_client.oauth2_get_authorize_url() + return redirect(auth_url) + except Exception as e: + flash('Unable to start Globus login: ' + str(e)) + return redirect(url_for('settings')) + + +@app.route('/globus/auth/complete', methods=['POST']) +def globus_auth_complete(): + code = (request.form.get('globus_auth_code') or '').strip() + if not code: + flash('Enter the authorization code from Globus first.') + return redirect(url_for('settings')) + + try: + verifier = session.get(GLOBUS_AUTH_VERIFIER_SESSION_KEY) + state = session.get(GLOBUS_AUTH_STATE_SESSION_KEY, '_default') + if not verifier: + raise RuntimeError('Start the Globus login flow again before exchanging a code.') + auth_client = _globus_auth_client() + auth_client.oauth2_start_flow( + refresh_tokens=True, + requested_scopes=['openid', 'profile', GLOBUS_TRANSFER_SCOPE], + redirect_uri='https://auth.globus.org/v2/web/auth-code', + verifier=verifier, + state=state, + ) + token_response = auth_client.oauth2_exchange_code_for_tokens(code) + transfer_tokens = token_response.by_resource_server.get('transfer.api.globus.org') + if not transfer_tokens: + raise RuntimeError('Missing transfer.api.globus.org token in OAuth response.') + _get_transfer_manager().save_token_payload(transfer_tokens) + session['globus_auth_started'] = False + session.pop(GLOBUS_AUTH_VERIFIER_SESSION_KEY, None) + session.pop(GLOBUS_AUTH_STATE_SESSION_KEY, None) + flash('Globus account connected successfully.') + except Exception as e: + flash('Failed to complete Globus login: ' + str(e)) + + return redirect(url_for('settings')) + + +@app.route('/globus/auth/disconnect') +def globus_auth_disconnect(): + _get_transfer_manager().clear_credentials() + session['globus_auth_started'] = False + session.pop(GLOBUS_AUTH_VERIFIER_SESSION_KEY, None) + session.pop(GLOBUS_AUTH_STATE_SESSION_KEY, None) + flash('Globus account disconnected.') + return redirect(url_for('settings')) + + +@app.route('/globus/source/start') +def globus_source_start(): + try: + _get_local_globus().start_background() + flash('Globus Connect Personal start was requested on this device.') + except Exception as e: + flash('Unable to start Globus Connect Personal: ' + str(e)) + return redirect(url_for('settings')) + + +@app.route('/globus/source/login') +def globus_source_login(): + try: + _get_local_globus().launch_setup_or_login() + flash('Globus Connect Personal setup/login was launched. If this device is headless, complete setup from a local shell or desktop session.') + except Exception as e: + flash('Unable to launch Globus Connect Personal setup/login: ' + str(e)) + return redirect(url_for('settings')) + + +@app.route('/globus/source/refresh') +def globus_source_refresh(): + endpoint_id = _get_local_globus().refresh_local_endpoint_id() + if endpoint_id: + flash('Detected local Globus source endpoint: ' + endpoint_id) + else: + flash('Unable to determine the local Globus source endpoint ID. Make sure Globus Connect Personal is running and globus-cli is installed.') + return redirect(url_for('settings')) + + @public_route @app.route('/newpass', methods=['GET', 'POST']) def newpass(): @@ -993,12 +1132,58 @@ def experiment(): if experimenter.running: flash("Experiment is already running.") else: + transfer_enabled = request.form.get('transfer_enabled') == 'on' + destination_endpoint = (request.form.get('transfer_destination_endpoint') or '').strip() + destination_base_path = (request.form.get('transfer_destination_base_path') or '/').strip() or '/' + if request.form.get('duration'): experimenter.duration = int(request.form['duration']) else: experimenter.duration = 7 if request.form.get('delay'): experimenter.delay = int(request.form['delay']) else: experimenter.delay = 60 if request.form.get('directory'): experimenter.dir = os.path.expanduser(os.path.join('~', request.form['directory'].replace('/', '-'))) else: experimenter.dir = os.path.expanduser('~') + + if transfer_enabled: + if not _get_transfer_manager().has_credentials(): + flash('Connect the SPIRO app to Globus in System settings before enabling transfer.') + return redirect(url_for('experiment')) + if not destination_endpoint: + flash('Destination endpoint ID is required for transfer.') + return redirect(url_for('experiment')) + + local_status = _get_local_globus().status(refresh_endpoint=False) + if not local_status['installed']: + flash('Install Globus Connect Personal on this device before enabling transfer.') + return redirect(url_for('experiment')) + if not local_status['configured']: + flash('Launch Globus Connect Personal setup/login from System settings before enabling transfer.') + return redirect(url_for('experiment')) + + try: + _get_local_globus().start_background() + except Exception as e: + flash('Unable to start the local Globus Connect Personal client: ' + str(e)) + return redirect(url_for('experiment')) + + source_endpoint = _get_local_globus().refresh_local_endpoint_id() + if not source_endpoint: + flash('Unable to determine the local Globus source endpoint ID. Make sure globus-cli is installed and the local client is logged in.') + return redirect(url_for('experiment')) + + source_base_path = os.path.abspath(experimenter.dir) + cfg.set('globus_source_endpoint', source_endpoint) + cfg.set('globus_source_base_path', source_base_path) + _get_transfer_manager().configure_run( + source_endpoint=source_endpoint, + source_base_path=source_base_path, + destination_endpoint=destination_endpoint, + destination_base_path=destination_base_path, + experiment_root=experimenter.dir, + ) + _get_transfer_manager().start_worker() + else: + _get_transfer_manager().configure_run('', '/', '', '/', experimenter.dir) + setLive('off') # zoomer.set(roi=1.0) current_zoom_tuple = (zoomer.y - zoomer.roi/2.0, zoomer.x - zoomer.roi/2.0, zoomer.roi, zoomer.roi) @@ -1020,12 +1205,22 @@ def experiment(): diskspace = round(df.free / 1024 ** 3, 1) diskreq = round(experimenter.nshots * experimenter.position_count * 8 / 1024, 1) timezone_name = cfg.get('timezone') + transfer_status = _get_transfer_manager().get_status() + local_globus_status = _get_local_globus().status() return render_template('experiment.html', running=experimenter.running, directory=experimenter.dir, starttime=format_timestamp(experimenter.starttime, timezone_name), delay=experimenter.delay, endtime=format_timestamp(experimenter.endtime, timezone_name), diskspace=diskspace, duration=experimenter.duration, status=experimenter.status, nshots=experimenter.nshots + 1, diskreq=diskreq, position_count=experimenter.position_count, name=cfg.get('name'), - defname=experimenter.getDefName()) + defname=experimenter.getDefName(), + globus_connected=_globus_connected(), + local_globus_ready=bool(local_globus_status.get('endpoint_id')), + local_globus_status=local_globus_status, + transfer_status=transfer_status, + transfer_defaults={ + 'destination_endpoint': transfer_status.get('destination_endpoint', ''), + 'destination_base_path': transfer_status.get('destination_base_path', '/'), + }) @not_while_running @@ -1174,6 +1369,10 @@ def settings(): flash('Timezone preference saved. Device time remains UTC.') except ValueError as exc: flash(str(exc)) + if request.form.get('globus_client_id') is not None: + cfg.set('globus_client_id', request.form.get('globus_client_id').strip()) + if request.form.get('globus_gcp_install_dir') is not None: + cfg.set('globus_gcp_install_dir', request.form.get('globus_gcp_install_dir').strip() or '~/.globusconnectpersonal') ssid, passwd = hostapd.get_ssid() timezone_name = cfg.get('timezone') current_time = now_in_timezone(timezone_name) @@ -1181,13 +1380,24 @@ def settings(): timezone_choices = list(COMMON_TIMEZONES) if not any(choice[0] == timezone_name for choice in timezone_choices): timezone_choices.append((timezone_name, timezone_name)) + transfer_status = _get_transfer_manager().get_status() + local_globus_status = _get_local_globus().status() return render_template('settings.html', name=cfg.get('name'), running=experimenter.running, version=cfg.version, debug=cfg.get('debug'), ip_addr=get_external_ip(), hotspot_ready=hostapd.is_ready(), hotspot_enabled=hostapd.is_enabled(), ssid=ssid, passwd=passwd, rotation=cfg.get('rotated_camera'), timezone_name=timezone_name, timezone_choices=timezone_choices, current_time=current_time.strftime('%Y-%m-%d %H:%M:%S %Z'), current_time_utc_iso=current_utc_time.isoformat(), - current_utc_time=current_utc_time.strftime('%Y-%m-%d %H:%M:%S %Z')) + current_utc_time=current_utc_time.strftime('%Y-%m-%d %H:%M:%S %Z'), + globus_connected=_globus_connected(), + globus_auth_started=bool(session.get('globus_auth_started')), + globus_client_id=cfg.get('globus_client_id'), + globus_gcp_install_dir=cfg.get('globus_gcp_install_dir'), + globus_source_endpoint=cfg.get('globus_source_endpoint'), + globus_source_base_path=cfg.get('globus_source_base_path'), + globus_local_endpoint_id=cfg.get('globus_local_endpoint_id'), + local_globus_status=local_globus_status, + transfer_status=transfer_status) @app.route('/sync-time') @@ -1402,21 +1612,26 @@ def set_hotspot(value): dayshutter = None camera = None hw = None +transfer_manager = None +local_globus = None restarting = False livestream = False def start(cam, myhw): + global camera, hw, experimenter, transfer_manager, current_focus_value """Start the web UI, initialize saved state, and serve requests. Startup restores the user's last saved zoom and focus after live view begins because some camera backends ignore crop/focus writes until streaming has started and settled. - """ - global camera, hw, experimenter, current_focus_value + """ camera = cam hw = myhw experimenter = Experimenter(hw=hw, cam=cam) + transfer_manager = GlobusTransferManager(cfg) + experimenter.transfer_cycle_handler = transfer_manager.queue_cycle_files + transfer_manager.start_worker() experimenter.start() if cfg.get('secret') == '': secret = hashlib.sha1(os.urandom(16)) @@ -1481,7 +1696,8 @@ def start(cam, myhw): stop() def stop(): - """Request experiment shutdown and wake the worker thread if needed.""" + if transfer_manager: + transfer_manager.stop_worker() experimenter.stop() experimenter.quit = True experimenter.status_change.set()