From f13bd6e8106eddf57ba6823768a90012e0f3f4ce Mon Sep 17 00:00:00 2001 From: Kirby Kalbaugh Date: Tue, 7 Apr 2026 14:49:18 -0400 Subject: [PATCH] added globus --- README.md | 31 +++++ setup.py | 2 +- spiro/config.py | 45 ++++++- spiro/experimenter.py | 9 ++ spiro/globus_local.py | 136 +++++++++++++++++++ spiro/templates/experiment.html | 28 +++- spiro/templates/settings.html | 111 ++++++++++++++++ spiro/transfer.py | 207 +++++++++++++++++++++++++++++ spiro/webui.py | 223 +++++++++++++++++++++++++++++++- 9 files changed, 786 insertions(+), 6 deletions(-) create mode 100644 spiro/globus_local.py create mode 100644 spiro/transfer.py diff --git a/README.md b/README.md index 3e836a7..81ce73f 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ The image below is a link to a YouTube video showing some of its design and feat * [Connecting to the web interface](#connecting-to-the-web-interface) * [Setting up imaging](#setting-up-imaging) * [Starting an experiment](#starting-an-experiment) + * [Globus transfer during experiments](#globus-transfer-during-experiments) * [Downloading images](#downloading-images) * [Maintaining the system](#maintaining-the-system) * [Restarting the software](#restarting-the-software) @@ -235,6 +236,36 @@ For locating the initial imaging position, the system turns the cube until a pos When imaging parameters are set up to your liking, you are ready to start your experiments. In the *Experiment control* view, choose a name for your experiment, as well as the duration and imaging frequency. After you choose *Start experiment*, the system will disable most of the functionality of the web interface, displaying a simple status window containing experiment parameters, as well as the last image captured. +### Globus transfer during experiments + +SPIRO can transfer newly captured files to Globus while an experiment runs. + +On a Raspberry Pi running Raspberry Pi OS, install the local source endpoint software first: + +```bash +sudo apt update +sudo apt install -y wget python3-pip tcl tk +python3 -m pip install --user globus-cli +wget https://downloads.globus.org/globus-connect-personal/linux/stable/globusconnectpersonal-latest.tgz +tar xzf globusconnectpersonal-latest.tgz +mv globusconnectpersonal-* ~/.globusconnectpersonal +``` + +After installation: + +1. Open *System settings* and set your *Globus Client ID*. +2. In *Local source endpoint*, use *Launch setup/login* to start Globus Connect Personal setup on the SPIRO device. +3. If the Pi is headless, complete the first-time setup from a local shell by running `~/.globusconnectpersonal/globusconnectpersonal` or `~/.globusconnectpersonal/globusconnectpersonal -setup`. +4. Use *Start local client* once Globus Connect Personal is configured. +5. Use *Refresh local endpoint ID* to detect the SPIRO device's source endpoint via `globus endpoint local-id`. +6. Click *Connect Globus* to authorize the SPIRO web app to submit transfer tasks. +7. Copy the authorization code from Globus and paste it into *Complete Globus Login*. +8. In *Experiment control*, enable transfer and provide only the destination endpoint ID and destination base path for this run. + +SPIRO now uses the local Globus Connect Personal endpoint on the device as the source endpoint automatically. The source path is set to the experiment directory for the active run. + +Transfers are queued once per imaging cycle and submitted in the background. If a transfer fails, SPIRO keeps imaging and retries automatically. + ### Downloading images Images can be downloaded from the web interface under *File manager*. The File manager also allows deleting files to free up space on the SD card. diff --git a/setup.py b/setup.py index 64d7a43..5aeee86 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ def get_version_and_cmdclass(package_path): cmdclass = cmdclass, packages = find_packages(), scripts = ['bin/spiro'], - install_requires = ['picamera2==0.3.31', 'RPi.GPIO==0.7.1', 'Flask==2.2.5', 'waitress==2.1.2', 'numpy==1.24.2', 'Werkzeug==2.2.3'], + install_requires = ['picamera2==0.3.31', 'RPi.GPIO==0.7.1', 'Flask==2.2.5', 'waitress==2.1.2', 'numpy==1.24.2', 'Werkzeug==2.2.3', 'globus-sdk>=3.39.0', 'cryptography>=42.0.0'], author = 'Kirby Kalbaugh', author_email = 'kkalbaug@purdue.edu', description = 'Control software for the SPIRO biological imaging system', diff --git a/spiro/config.py b/spiro/config.py index 7eea0a4..2b6f09c 100644 --- a/spiro/config.py +++ b/spiro/config.py @@ -5,6 +5,7 @@ import json import os import sys +from cryptography.fernet import Fernet from importlib import metadata from ._version import __version__ @@ -55,7 +56,13 @@ class Config(object): 'name': 'spiro', # the name of this spiro instance 'timezone': 'America/New_York', # user display/save timezone; device clock remains UTC 'debug': True, # debug logging - 'rotated_camera': True # rotated camera house + 'rotated_camera': True, # rotated camera house + 'globus_client_id': '', + 'globus_token_encrypted': '', + 'globus_gcp_install_dir': '~/.globusconnectpersonal', + 'globus_local_endpoint_id': '', + 'globus_source_endpoint': '', + 'globus_source_base_path': '/' } config = {} @@ -63,6 +70,7 @@ class Config(object): def __init__(self): self.cfgdir = os.path.expanduser("~/.config/spiro") self.cfgfile = os.path.join(self.cfgdir, "spiro.conf") + self.keyfile = os.path.join(self.cfgdir, "token.key") self.version = _resolve_version() self.read() if os.path.exists(self.cfgfile): @@ -87,9 +95,44 @@ def write(self): with open(self.cfgfile + ".tmp", 'w') as f: json.dump(self.config, f, indent=4) os.replace(self.cfgfile + ".tmp", self.cfgfile) + os.chmod(self.cfgfile, 0o600) except OSError as e: log("Failed to write config file: " + e.strerror) + + def _get_or_create_secret_key(self): + os.makedirs(self.cfgdir, exist_ok=True) + if os.path.exists(self.keyfile): + with open(self.keyfile, 'rb') as f: + key = f.read().strip() + if key: + return key + + key = Fernet.generate_key() + with open(self.keyfile + '.tmp', 'wb') as f: + f.write(key) + os.replace(self.keyfile + '.tmp', self.keyfile) + os.chmod(self.keyfile, 0o600) + return key + + + def encrypt_secret(self, value): + if value in [None, '']: + return '' + cipher = Fernet(self._get_or_create_secret_key()) + return cipher.encrypt(value.encode('utf-8')).decode('utf-8') + + + def decrypt_secret(self, value): + if value in [None, '']: + return '' + try: + cipher = Fernet(self._get_or_create_secret_key()) + return cipher.decrypt(value.encode('utf-8')).decode('utf-8') + except Exception as e: + log('Failed to decrypt secret: ' + str(e)) + return '' + def get(self, key): if os.path.exists(self.cfgfile): st = os.stat(self.cfgfile) diff --git a/spiro/experimenter.py b/spiro/experimenter.py index 0092757..5aa4e71 100644 --- a/spiro/experimenter.py +++ b/spiro/experimenter.py @@ -37,6 +37,7 @@ def __init__(self, hw=None, cam=None): self.preview_lock = threading.Lock() self.nshots = 0 self.idlepos = 0 + self.transfer_cycle_handler = None threading.Thread.__init__(self) @@ -241,6 +242,7 @@ def runExperiment(self): nextloop = time.time() + 60 * self.delay if nextloop > self.endtime: nextloop = self.endtime + cycle_files = [] self.hw.motorOn(True) for i in range(self.position_count): @@ -267,11 +269,18 @@ def runExperiment(self): now = filename_timestamp(self.cfg.get('timezone')) name = os.path.join("plate" + str(i + 1), "plate" + str(i + 1) + "-" + now) self.takePicture(name, i) + cycle_files.append(self.last_captured[i]) if self.stop_experiment: self.hw.motorOn(False) break + if self.transfer_cycle_handler and cycle_files: + try: + self.transfer_cycle_handler(cycle_files) + except Exception as e: + debug('Failed to queue transfer cycle files: ' + str(e)) + self.nshots -= 1 self.hw.motorOn(False) if self.status != "Stopping": self.status = "Waiting" diff --git a/spiro/globus_local.py b/spiro/globus_local.py new file mode 100644 index 0000000..abb8fe8 --- /dev/null +++ b/spiro/globus_local.py @@ -0,0 +1,136 @@ +import os +import re +import shutil +import subprocess + +from spiro.logger import debug + + +class LocalGlobusConnectPersonal(object): + def __init__(self, cfg): + self.cfg = cfg + + + def install_dir(self): + return os.path.expanduser(self.cfg.get('globus_gcp_install_dir') or '~/.globusconnectpersonal') + + + def binary_path(self): + return os.path.join(self.install_dir(), 'globusconnectpersonal') + + + def is_installed(self): + return os.path.isfile(self.binary_path()) + + + def is_configured(self): + return os.path.isdir(os.path.expanduser('~/.globusonline')) + + + def cli_path(self): + return shutil.which('globus') + + + def launch_setup_or_login(self): + if not self.is_installed(): + raise RuntimeError('Globus Connect Personal is not installed. Follow the README setup instructions first.') + + subprocess.Popen( + [self.binary_path()], + cwd=self.install_dir(), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, + ) + + + def start_background(self): + if not self.is_installed(): + raise RuntimeError('Globus Connect Personal is not installed. Follow the README setup instructions first.') + + result = subprocess.run( + [self.binary_path(), '-start'], + cwd=self.install_dir(), + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + details = (result.stderr or result.stdout or 'Unable to start Globus Connect Personal.').strip() + raise RuntimeError(details) + return result.stdout.strip() + + + def status(self, refresh_endpoint=True): + info = { + 'install_dir': self.install_dir(), + 'binary_path': self.binary_path(), + 'installed': self.is_installed(), + 'configured': self.is_configured(), + 'running': False, + 'connected': False, + 'status_text': 'Not installed', + 'endpoint_id': self.cfg.get('globus_local_endpoint_id') or '', + 'cli_available': bool(self.cli_path()), + } + + if not info['installed']: + return info + + try: + result = subprocess.run( + [self.binary_path(), '-status'], + cwd=self.install_dir(), + capture_output=True, + text=True, + timeout=15, + ) + output = (result.stdout or result.stderr or '').strip() + if result.returncode == 0: + info['running'] = True + info['status_text'] = output or 'Running' + info['connected'] = 'globus online: connected' in output.lower() + elif info['configured']: + info['status_text'] = output or 'Installed but not running' + else: + info['status_text'] = 'Installed but not configured' + except Exception as e: + debug('Failed to read Globus Connect Personal status: ' + str(e)) + if info['configured']: + info['status_text'] = 'Installed but status is unavailable' + + if refresh_endpoint: + endpoint_id = self.refresh_local_endpoint_id() + if endpoint_id: + info['endpoint_id'] = endpoint_id + + return info + + + def refresh_local_endpoint_id(self): + cli = self.cli_path() + if not cli: + return self.cfg.get('globus_local_endpoint_id') or '' + + try: + result = subprocess.run( + [cli, 'endpoint', 'local-id'], + capture_output=True, + text=True, + timeout=15, + ) + except Exception as e: + debug('Failed to query local Globus endpoint id: ' + str(e)) + return self.cfg.get('globus_local_endpoint_id') or '' + + if result.returncode != 0: + return self.cfg.get('globus_local_endpoint_id') or '' + + endpoint_id = (result.stdout or '').strip().splitlines() + endpoint_id = endpoint_id[-1].strip() if endpoint_id else '' + if re.fullmatch(r'[0-9a-fA-F-]{36}', endpoint_id): + self.cfg.set('globus_local_endpoint_id', endpoint_id) + self.cfg.set('globus_source_endpoint', endpoint_id) + return endpoint_id + + return self.cfg.get('globus_local_endpoint_id') or '' \ No newline at end of file diff --git a/spiro/templates/experiment.html b/spiro/templates/experiment.html index 0eae033..3dea97e 100644 --- a/spiro/templates/experiment.html +++ b/spiro/templates/experiment.html @@ -49,7 +49,11 @@

Experiment info

Ends: {{ endtime }}
Images remaining/plate: {{ nshots }}
Disk required: {{ diskreq }} GB
-Disk available: {{ diskspace }} GB

+Disk available: {{ diskspace }} GB
+Transfer status: {{ transfer_status.status }}
+Transfer queue: {{ transfer_status.queued }} batch(es)
+Submitted files: {{ transfer_status.submitted }}
+Failed files: {{ transfer_status.failed }}

@@ -87,6 +91,28 @@

Experiment info

{{ diskspace }} GB
+Globus transfer (optional) +
+ + +{% if globus_connected and local_globus_ready %}Queue cycle-end batches during run.{% elif not globus_connected %}Connect the SPIRO app to Globus in System settings first.{% else %}Install/login the local Globus Connect Personal client first.{% endif %} +
+
+ +{% if local_globus_status.endpoint_id %}{{ local_globus_status.endpoint_id }}{% else %}Not detected yet{% endif %} +
+
+ +Automatically set to the experiment directory when the run starts. +
+
+ + +
+
+ + +
diff --git a/spiro/templates/settings.html b/spiro/templates/settings.html index 880cfe4..a5eece0 100644 --- a/spiro/templates/settings.html +++ b/spiro/templates/settings.html @@ -42,15 +42,126 @@ {% endfor %}
+Globus transfer +
+ + +
+
+ + +
Change password +{% if globus_connected %} +Disconnect Globus +{% else %} +Connect Globus +{% endif %}
Used for saved filenames and displayed dates. Device stays on UTC. Default is EST via America/New_York.
+
+Connect Globus authorizes SPIRO to submit transfer tasks. The source endpoint is managed locally by Globus Connect Personal on this device. +
+
+
+
+ + +
+
+ +
+
+
+
+
+Globus status: +
+
+{% if globus_connected %}Connected{% else %}Not connected{% endif %} +
+
+
+
+Transfer worker: +
+
+{{ transfer_status.status }} +
+
+Local source endpoint +
+Launch setup/login +Start local client +Refresh local endpoint ID +
+
+SPIRO will auto-detect this device's source endpoint and use the experiment directory as the source path. +
+
+
+Client install: +
+
+{% if local_globus_status.installed %}Installed{% else %}Not installed{% endif %} +
+
+
+
+Client configured: +
+
+{% if local_globus_status.configured %}Yes{% else %}No{% endif %} +
+
+
+
+Client status: +
+
+{{ local_globus_status.status_text }} +
+
+
+
+CLI available: +
+
+{% if local_globus_status.cli_available %}Yes{% else %}No{% endif %} +
+
+
+
+Local endpoint ID: +
+
+{% if local_globus_status.endpoint_id %}{{ local_globus_status.endpoint_id }}{% else %}Unavailable{% endif %} +
+
+
+
+Source path base: +
+
+{% if globus_source_base_path %}{{ globus_source_base_path }}{% else %}Will use the active experiment directory{% endif %} +
+
+{% if globus_auth_started %} +
+
+Globus login: +
+
+Authorization started. Paste the code to finish login. +
+
+{% endif %} Time
diff --git a/spiro/transfer.py b/spiro/transfer.py new file mode 100644 index 0000000..480aaa7 --- /dev/null +++ b/spiro/transfer.py @@ -0,0 +1,207 @@ +# transfer.py - +# background Globus transfer manager +# + +import os +import time +import queue +import threading +import json +import importlib + +from spiro.logger import log, debug + + +def _get_globus_sdk(): + try: + return importlib.import_module('globus_sdk') + except Exception: + return None + + +class GlobusTransferManager(object): + def __init__(self, cfg): + self.cfg = cfg + self.enabled = False + self.source_endpoint = '' + self.source_base_path = '/' + self.destination_endpoint = '' + self.destination_base_path = '/' + self.experiment_root = '' + self.queue = queue.Queue() + self.stop_event = threading.Event() + self.worker = None + self.status_lock = threading.Lock() + self.last_status = 'Idle' + self.last_error = '' + self.total_submitted = 0 + self.total_failed = 0 + + def _set_status(self, status, err=''): + with self.status_lock: + self.last_status = status + self.last_error = err + + def get_status(self): + with self.status_lock: + return { + 'enabled': self.enabled, + 'status': self.last_status, + 'error': self.last_error, + 'submitted': self.total_submitted, + 'failed': self.total_failed, + 'queued': self.queue.qsize(), + 'source_endpoint': self.source_endpoint, + 'destination_endpoint': self.destination_endpoint, + 'destination_base_path': self.destination_base_path, + } + + def _load_token_payload(self): + enc = self.cfg.get('globus_token_encrypted') + if not enc: + return None + payload_text = self.cfg.decrypt_secret(enc) + if not payload_text: + return None + try: + return json.loads(payload_text) + except Exception as e: + debug('Failed to parse token payload: ' + str(e)) + return None + + def has_credentials(self): + payload = self._load_token_payload() + return bool(payload and payload.get('refresh_token')) + + def clear_credentials(self): + self.cfg.set('globus_token_encrypted', '') + + def save_token_payload(self, payload): + serial = json.dumps(payload) + self.cfg.set('globus_token_encrypted', self.cfg.encrypt_secret(serial)) + + def _build_transfer_client(self): + globus_sdk = _get_globus_sdk() + if globus_sdk is None: + raise RuntimeError('globus-sdk is not installed on this system.') + payload = self._load_token_payload() + if not payload: + raise RuntimeError('No Globus token payload found. Authenticate first.') + + client_id = self.cfg.get('globus_client_id') + if not client_id: + raise RuntimeError('Globus Client ID missing. Configure in settings.') + + auth_client = globus_sdk.NativeAppAuthClient(client_id) + + def _on_refresh(token_response): + transfer_tokens = token_response.by_resource_server.get('transfer.api.globus.org', {}) + if transfer_tokens: + self.save_token_payload(transfer_tokens) + + authorizer = globus_sdk.RefreshTokenAuthorizer( + payload.get('refresh_token'), + auth_client, + access_token=payload.get('access_token'), + expires_at=payload.get('expires_at_seconds'), + on_refresh=_on_refresh, + ) + return globus_sdk.TransferClient(authorizer=authorizer) + + def configure_run(self, source_endpoint, source_base_path, destination_endpoint, destination_base_path, experiment_root): + self.source_endpoint = (source_endpoint or '').strip() + self.source_base_path = (source_base_path or '/').strip() or '/' + self.destination_endpoint = (destination_endpoint or '').strip() + self.destination_base_path = (destination_base_path or '/').strip() or '/' + self.experiment_root = os.path.abspath(experiment_root) + self.enabled = bool(self.source_endpoint and self.destination_endpoint and self.has_credentials()) + if not self.enabled: + self._set_status('Transfer disabled') + + def start_worker(self): + if self.worker and self.worker.is_alive(): + return + self.stop_event.clear() + self.worker = threading.Thread(target=self._worker_loop, daemon=True) + self.worker.start() + + def stop_worker(self): + self.stop_event.set() + + def queue_cycle_files(self, files): + if not self.enabled or not files: + return + self.queue.put({'files': list(files), 'attempt': 1, 'queued_at': time.time()}) + self._set_status('Queued cycle batch') + + def _normalize_rel_path(self, filepath): + rel = os.path.relpath(os.path.abspath(filepath), self.experiment_root) + return rel.replace('\\', '/') + + def _source_path(self, filepath): + rel = self._normalize_rel_path(filepath) + base = self.source_base_path.rstrip('/') + if not base: + base = '/' + if base == '/': + return '/' + rel + return base + '/' + rel + + def _dest_path(self, filepath): + rel = self._normalize_rel_path(filepath) + base = self.destination_base_path.rstrip('/') + if not base: + base = '/' + if base == '/': + return '/' + rel + return base + '/' + rel + + def _submit_batch(self, files): + globus_sdk = _get_globus_sdk() + if globus_sdk is None: + raise RuntimeError('globus-sdk is not installed on this system.') + tc = self._build_transfer_client() + tdata = globus_sdk.TransferData( + tc, + self.source_endpoint, + self.destination_endpoint, + label='SPIRO cycle transfer', + sync_level='checksum', + ) + for fp in files: + tdata.add_item(self._source_path(fp), self._dest_path(fp)) + res = tc.submit_transfer(tdata) + task_id = res.get('task_id', 'unknown') + self.total_submitted += len(files) + self._set_status('Submitted transfer task ' + task_id) + log('Submitted Globus transfer task ' + task_id + ' with ' + str(len(files)) + ' files.') + + def _worker_loop(self): + self._set_status('Transfer worker running') + while not self.stop_event.is_set(): + try: + item = self.queue.get(timeout=1) + except queue.Empty: + continue + + files = item.get('files', []) + attempt = int(item.get('attempt', 1)) + if not files: + self.queue.task_done() + continue + + try: + self._set_status('Submitting cycle batch') + self._submit_batch(files) + except Exception as e: + self.total_failed += len(files) + self._set_status('Transfer failed, retrying', str(e)) + log('Globus transfer failed (attempt ' + str(attempt) + '): ' + str(e)) + if attempt < 5 and not self.stop_event.is_set(): + backoff = min(300, 2 ** attempt) + time.sleep(backoff) + self.queue.put({'files': files, 'attempt': attempt + 1, 'queued_at': time.time()}) + finally: + self.queue.task_done() + + self._set_status('Transfer worker stopped') diff --git a/spiro/webui.py b/spiro/webui.py index 008b877..a78b099 100644 --- a/spiro/webui.py +++ b/spiro/webui.py @@ -5,25 +5,35 @@ import io import os import re +import base64 import subprocess import time import shutil import signal import hashlib from threading import Thread, Lock, Condition +try: + import globus_sdk +except Exception: + globus_sdk = None from waitress import serve from flask import Flask, render_template, Response, request, redirect, url_for, session, flash, abort import spiro.hostapd as hostapd from spiro.config import Config +from spiro.globus_local import LocalGlobusConnectPersonal from spiro.logger import log, debug from spiro.experimenter import Experimenter +from spiro.transfer import GlobusTransferManager from spiro.timeutils import COMMON_TIMEZONES, format_timestamp, now_in_timezone, validate_timezone_name app = Flask(__name__) app.jinja_env.trim_blocks = True app.jinja_env.lstrip_blocks = True +GLOBUS_TRANSFER_SCOPE = 'urn:globus:auth:scope:transfer.api.globus.org:all' +GLOBUS_AUTH_VERIFIER_SESSION_KEY = 'globus_auth_verifier' +GLOBUS_AUTH_STATE_SESSION_KEY = 'globus_auth_state' class Rotator(Thread): def __init__(self, value): @@ -177,6 +187,37 @@ def checkPass(pwd): return False +def _globus_auth_client(): + if globus_sdk is None: + raise RuntimeError('globus-sdk is not installed on this system.') + client_id = (cfg.get('globus_client_id') or '').strip() + if not client_id: + raise RuntimeError('Set Globus Client ID in System settings before connecting.') + return globus_sdk.NativeAppAuthClient(client_id) + + +def _get_transfer_manager(): + global transfer_manager + if transfer_manager is None: + transfer_manager = GlobusTransferManager(cfg) + return transfer_manager + + +def _get_local_globus(): + global local_globus + if local_globus is None: + local_globus = LocalGlobusConnectPersonal(cfg) + return local_globus + + +def _globus_connected(): + return _get_transfer_manager().has_credentials() + + +def _new_globus_auth_verifier(): + return base64.urlsafe_b64encode(os.urandom(32)).decode('ascii').rstrip('=') + + def get_plate_dirs(check_dir): plate_dirs = [] for entry in os.scandir(check_dir): @@ -225,6 +266,104 @@ def logout(): return redirect(url_for('login')) +@app.route('/globus/auth/start') +def globus_auth_start(): + try: + auth_client = _globus_auth_client() + verifier = _new_globus_auth_verifier() + state = base64.urlsafe_b64encode(os.urandom(12)).decode('ascii').rstrip('=') + auth_client.oauth2_start_flow( + refresh_tokens=True, + requested_scopes=['openid', 'profile', GLOBUS_TRANSFER_SCOPE], + redirect_uri='https://auth.globus.org/v2/web/auth-code', + verifier=verifier, + state=state, + ) + session['globus_auth_started'] = True + session[GLOBUS_AUTH_VERIFIER_SESSION_KEY] = verifier + session[GLOBUS_AUTH_STATE_SESSION_KEY] = state + auth_url = auth_client.oauth2_get_authorize_url() + return redirect(auth_url) + except Exception as e: + flash('Unable to start Globus login: ' + str(e)) + return redirect(url_for('settings')) + + +@app.route('/globus/auth/complete', methods=['POST']) +def globus_auth_complete(): + code = (request.form.get('globus_auth_code') or '').strip() + if not code: + flash('Enter the authorization code from Globus first.') + return redirect(url_for('settings')) + + try: + verifier = session.get(GLOBUS_AUTH_VERIFIER_SESSION_KEY) + state = session.get(GLOBUS_AUTH_STATE_SESSION_KEY, '_default') + if not verifier: + raise RuntimeError('Start the Globus login flow again before exchanging a code.') + auth_client = _globus_auth_client() + auth_client.oauth2_start_flow( + refresh_tokens=True, + requested_scopes=['openid', 'profile', GLOBUS_TRANSFER_SCOPE], + redirect_uri='https://auth.globus.org/v2/web/auth-code', + verifier=verifier, + state=state, + ) + token_response = auth_client.oauth2_exchange_code_for_tokens(code) + transfer_tokens = token_response.by_resource_server.get('transfer.api.globus.org') + if not transfer_tokens: + raise RuntimeError('Missing transfer.api.globus.org token in OAuth response.') + _get_transfer_manager().save_token_payload(transfer_tokens) + session['globus_auth_started'] = False + session.pop(GLOBUS_AUTH_VERIFIER_SESSION_KEY, None) + session.pop(GLOBUS_AUTH_STATE_SESSION_KEY, None) + flash('Globus account connected successfully.') + except Exception as e: + flash('Failed to complete Globus login: ' + str(e)) + + return redirect(url_for('settings')) + + +@app.route('/globus/auth/disconnect') +def globus_auth_disconnect(): + _get_transfer_manager().clear_credentials() + session['globus_auth_started'] = False + session.pop(GLOBUS_AUTH_VERIFIER_SESSION_KEY, None) + session.pop(GLOBUS_AUTH_STATE_SESSION_KEY, None) + flash('Globus account disconnected.') + return redirect(url_for('settings')) + + +@app.route('/globus/source/start') +def globus_source_start(): + try: + _get_local_globus().start_background() + flash('Globus Connect Personal start was requested on this device.') + except Exception as e: + flash('Unable to start Globus Connect Personal: ' + str(e)) + return redirect(url_for('settings')) + + +@app.route('/globus/source/login') +def globus_source_login(): + try: + _get_local_globus().launch_setup_or_login() + flash('Globus Connect Personal setup/login was launched. If this device is headless, complete setup from a local shell or desktop session.') + except Exception as e: + flash('Unable to launch Globus Connect Personal setup/login: ' + str(e)) + return redirect(url_for('settings')) + + +@app.route('/globus/source/refresh') +def globus_source_refresh(): + endpoint_id = _get_local_globus().refresh_local_endpoint_id() + if endpoint_id: + flash('Detected local Globus source endpoint: ' + endpoint_id) + else: + flash('Unable to determine the local Globus source endpoint ID. Make sure Globus Connect Personal is running and globus-cli is installed.') + return redirect(url_for('settings')) + + @public_route @app.route('/newpass', methods=['GET', 'POST']) def newpass(): @@ -475,12 +614,58 @@ def experiment(): if experimenter.running: flash("Experiment is already running.") else: + transfer_enabled = request.form.get('transfer_enabled') == 'on' + destination_endpoint = (request.form.get('transfer_destination_endpoint') or '').strip() + destination_base_path = (request.form.get('transfer_destination_base_path') or '/').strip() or '/' + if request.form.get('duration'): experimenter.duration = int(request.form['duration']) else: experimenter.duration = 7 if request.form.get('delay'): experimenter.delay = int(request.form['delay']) else: experimenter.delay = 60 if request.form.get('directory'): experimenter.dir = os.path.expanduser(os.path.join('~', request.form['directory'].replace('/', '-'))) else: experimenter.dir = os.path.expanduser('~') + + if transfer_enabled: + if not _get_transfer_manager().has_credentials(): + flash('Connect the SPIRO app to Globus in System settings before enabling transfer.') + return redirect(url_for('experiment')) + if not destination_endpoint: + flash('Destination endpoint ID is required for transfer.') + return redirect(url_for('experiment')) + + local_status = _get_local_globus().status(refresh_endpoint=False) + if not local_status['installed']: + flash('Install Globus Connect Personal on this device before enabling transfer.') + return redirect(url_for('experiment')) + if not local_status['configured']: + flash('Launch Globus Connect Personal setup/login from System settings before enabling transfer.') + return redirect(url_for('experiment')) + + try: + _get_local_globus().start_background() + except Exception as e: + flash('Unable to start the local Globus Connect Personal client: ' + str(e)) + return redirect(url_for('experiment')) + + source_endpoint = _get_local_globus().refresh_local_endpoint_id() + if not source_endpoint: + flash('Unable to determine the local Globus source endpoint ID. Make sure globus-cli is installed and the local client is logged in.') + return redirect(url_for('experiment')) + + source_base_path = os.path.abspath(experimenter.dir) + cfg.set('globus_source_endpoint', source_endpoint) + cfg.set('globus_source_base_path', source_base_path) + _get_transfer_manager().configure_run( + source_endpoint=source_endpoint, + source_base_path=source_base_path, + destination_endpoint=destination_endpoint, + destination_base_path=destination_base_path, + experiment_root=experimenter.dir, + ) + _get_transfer_manager().start_worker() + else: + _get_transfer_manager().configure_run('', '/', '', '/', experimenter.dir) + setLive('off') # zoomer.set(roi=1.0) current_zoom_tuple = ( zoomer.y - zoomer.roi/2.0, zoomer.x - zoomer.roi/2.0, zoomer.roi, zoomer.roi) @@ -504,12 +689,22 @@ def experiment(): diskspace = round(df.free / 1024 ** 3, 1) diskreq = round(experimenter.nshots * experimenter.position_count * 8 / 1024, 1) timezone_name = cfg.get('timezone') + transfer_status = _get_transfer_manager().get_status() + local_globus_status = _get_local_globus().status() return render_template('experiment.html', running=experimenter.running, directory=experimenter.dir, starttime=format_timestamp(experimenter.starttime, timezone_name), delay=experimenter.delay, endtime=format_timestamp(experimenter.endtime, timezone_name), diskspace=diskspace, duration=experimenter.duration, status=experimenter.status, nshots=experimenter.nshots + 1, diskreq=diskreq, position_count=experimenter.position_count, name=cfg.get('name'), - defname=experimenter.getDefName()) + defname=experimenter.getDefName(), + globus_connected=_globus_connected(), + local_globus_ready=bool(local_globus_status.get('endpoint_id')), + local_globus_status=local_globus_status, + transfer_status=transfer_status, + transfer_defaults={ + 'destination_endpoint': transfer_status.get('destination_endpoint', ''), + 'destination_base_path': transfer_status.get('destination_base_path', '/'), + }) @not_while_running @@ -639,6 +834,10 @@ def settings(): flash('Timezone preference saved. Device time remains UTC.') except ValueError as exc: flash(str(exc)) + if request.form.get('globus_client_id') is not None: + cfg.set('globus_client_id', request.form.get('globus_client_id').strip()) + if request.form.get('globus_gcp_install_dir') is not None: + cfg.set('globus_gcp_install_dir', request.form.get('globus_gcp_install_dir').strip() or '~/.globusconnectpersonal') ssid, passwd = hostapd.get_ssid() timezone_name = cfg.get('timezone') current_time = now_in_timezone(timezone_name) @@ -646,13 +845,24 @@ def settings(): timezone_choices = list(COMMON_TIMEZONES) if not any(choice[0] == timezone_name for choice in timezone_choices): timezone_choices.append((timezone_name, timezone_name)) + transfer_status = _get_transfer_manager().get_status() + local_globus_status = _get_local_globus().status() return render_template('settings.html', name=cfg.get('name'), running=experimenter.running, version=cfg.version, debug=cfg.get('debug'), ip_addr=get_external_ip(), hotspot_ready=hostapd.is_ready(), hotspot_enabled=hostapd.is_enabled(), ssid=ssid, passwd=passwd, rotation=cfg.get('rotated_camera'), timezone_name=timezone_name, timezone_choices=timezone_choices, current_time=current_time.strftime('%Y-%m-%d %H:%M:%S %Z'), current_time_utc_iso=current_utc_time.isoformat(), - current_utc_time=current_utc_time.strftime('%Y-%m-%d %H:%M:%S %Z')) + current_utc_time=current_utc_time.strftime('%Y-%m-%d %H:%M:%S %Z'), + globus_connected=_globus_connected(), + globus_auth_started=bool(session.get('globus_auth_started')), + globus_client_id=cfg.get('globus_client_id'), + globus_gcp_install_dir=cfg.get('globus_gcp_install_dir'), + globus_source_endpoint=cfg.get('globus_source_endpoint'), + globus_source_base_path=cfg.get('globus_source_base_path'), + globus_local_endpoint_id=cfg.get('globus_local_endpoint_id'), + local_globus_status=local_globus_status, + transfer_status=transfer_status) @app.route('/sync-time') @@ -862,15 +1072,20 @@ def set_hotspot(value): dayshutter = None camera = None hw = None +transfer_manager = None +local_globus = None restarting = False livestream = False def start(cam, myhw): - global camera, hw, experimenter + global camera, hw, experimenter, transfer_manager camera = cam hw = myhw experimenter = Experimenter(hw=hw, cam=cam) + transfer_manager = GlobusTransferManager(cfg) + experimenter.transfer_cycle_handler = transfer_manager.queue_cycle_files + transfer_manager.start_worker() experimenter.start() if cfg.get('secret') == '': secret = hashlib.sha1(os.urandom(16)) @@ -931,6 +1146,8 @@ def start(cam, myhw): stop() def stop(): + if transfer_manager: + transfer_manager.stop_worker() experimenter.stop() experimenter.quit = True experimenter.status_change.set()