diff --git a/README.md b/README.md index 3e836a7..dc715a6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ ``` 2026-03-19 -This is an updated version of the SPIRO project for Raspberry PI 4, Ubuntu Bookworm and 16MP Camera +This is an updated version of the SPIRO project for Raspberry Pi 4, Raspberry Pi OS Bookworm 32-bit, the Waveshare Stepper Motor HAT, and a 16MP camera Original Repo and Author: https://github.com/jonasoh/spiro ``` @@ -47,6 +47,18 @@ It is also possible to use a [Raspberry Pi 4B](https://www.raspberrypi.com/produ The system basically consists of a camera, a green LED illuminator for imaging in the dark, and a motor-controlled imaging stage, as shown below. +The current hardware-control code assumes the stage motor is connected to **M2** on the [Waveshare Stepper Motor HAT](https://www.waveshare.com/wiki/Stepper_Motor_HAT), which uses a DRV8825 driver. SPIRO drives M2 through these BCM pins: + +* `DIR`: BCM 24 +* `STEP`: BCM 18 +* `ENABLE`: BCM 4 + +The illumination LED remains on BCM 17. Because BCM 4 is consumed by the Waveshare M2 enable input, the default SPIRO configuration expects the positional microswitch on BCM 12. If your switch is wired elsewhere, update the persistent `sensor` setting accordingly. + +The code now assumes the Waveshare HAT is left at its **factory-default** DIP setting, which is 1/32 microstepping. On M2 that means the `D3-D5` switches stay in their shipped position. SPIRO preserves the older calibration scale by internally expanding each logical software step into 16 STEP pulses. If you change the DIP switches away from the factory default, all saved calibration and jog values will change proportionally and must be recalibrated. + +The Waveshare HAT also requires its own motor supply. Use the HAT's external `VIN` input with an appropriate supply for your motor, typically 9V to 12V, and set the DRV8825 current limit with the onboard potentiometer before long runs. + ![SPIRO close-up](https://user-images.githubusercontent.com/6480370/60957134-6a4ae280-a304-11e9-8a03-0d854267297b.jpeg) It is relatively cheap and easy to assemble multiple systems for running larger experiments. @@ -75,7 +87,7 @@ https://user-images.githubusercontent.com/6480370/149747187-09aa5269-0595-4daf-8 ## Installation -First, prepare the SD card with a fresh release of [Raspberry Pi OS Lite **(Legacy)**](https://www.raspberrypi.com/software/operating-systems/#raspberry-pi-os-legacy) (follow the official [instructions](https://www.raspberrypi.org/documentation/installation/installing-images/README.md)). **NB! Due to a change in the camera stack, only "Legacy" Raspberry Pi OS versions are supported for now.** +First, prepare the SD card with a fresh release of **Raspberry Pi OS Lite (32-bit) Bookworm** for Raspberry Pi 4 (follow the official [instructions](https://www.raspberrypi.org/documentation/installation/installing-images/README.md)). This codebase targets the Bookworm libcamera stack and the Waveshare Stepper Motor HAT. **Note**: If using the [ArduCam drop-in replacement camera module](https://www.arducam.com/product/arducam-imx219-auto-focus-camera-module-drop-in-replacement-for-raspberry-pi-v2-and-nvidia-jetson-nano-camera/), you need to add the following line to the file config.txt on the newly prepared SD card: ``` @@ -96,7 +108,7 @@ sudo raspi-config In the raspi-config interface, make the following changes: * **Change the password**. The system will allow network access, and a weak password **will** compromise your network security **and** your experimental data. * After changing the password, connect the network cable (if you are using wired networking). -* Under *Interfacing*, enable *Camera*, *I2C*, and *SSH*. +* Under *Interfacing*, enable *I2C* and *SSH*. The camera is used through Bookworm's libcamera stack, so the old Legacy camera stack is not required. * In *Performance Options*, set *GPU Memory* to 256. * Under *Localisation Options*, make sure to set the *Timezone*. Please note that a working network connection is required to maintain the correct date. * If needed, configure *Network* and *Localization* options here as well. Set a *Hostname* under Network if you plan on running several SPIROs. @@ -108,10 +120,30 @@ Next, ensure the system is connected to the internet. Update the operating syste ``` sudo apt update sudo apt upgrade -y -sudo apt install -y git zip i2c-tools libatlas-base-dev python3-pip python3-pil +sudo apt install -y git zip i2c-tools libatlas-base-dev python3-pip python3-pil python3-gpiozero python3-lgpio sudo apt install -y python3-av python3-numpy python3-pil libcap-dev libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libavfilter-dev libswscale-dev libswresample-dev pkg-config ``` +Before assembling the stage, set the Waveshare HAT to the expected SPIRO defaults: + +* Connect the stage motor to **M2**. +* Set the HAT's motor power switch on. +* Provide an external motor supply to the HAT `VIN` input. +* Set the M2 `D3-D5` DIP switches for the desired microstepping mode. For more + torque (recommended when the motor is skipping or lacks power), set M2 to + **1/8 microstepping** by configuring the switches for M2 as: `D3=ON`, + `D4=ON`, `D5=OFF` (i.e. MODE0=1, MODE1=1, MODE2=0). Always power OFF the + HAT (turn motor VIN off) before changing DIP switches to avoid damage. + After changing DIP switches, restore motor power and test motion slowly. + + Note: SPIRO preserves a legacy logical-step scale by emitting multiple + microstep pulses per logical step. The code default maps one logical step to + 4 microstep pulses when using 1/8 microstepping. If you change the microstep + mode you must recalibrate the stage because saved calibration and jog + values scale with the microstepping setting. + +* If you are using the original SPIRO stop-switch wiring on BCM 4, move that switch to another free GPIO such as BCM 12 because BCM 4 is now required by the M2 enable pin. + Then, install the SPIRO software and its dependencies: ``` @@ -229,7 +261,9 @@ After logging in to the system, you are presented with the *Live view*. Here, yo Under *Day image settings* and *Night image settings*, you can adjust the exposure time for day and night images in real time. For night images, make sure that representative conditions are used (i.e., turn off the lights in the growth chamber). When the image is captured according to your liking, choose *Update and save*. -For locating the initial imaging position, the system turns the cube until a positional switch is activated. It then turns the cube a predefined amount of steps (*calibration value*). The check that the calibration value is correct, go to the *Start position calibration* view. Here, you may try out the current value, as well as change it. Make sure to click *Save value* if you change it. +For locating the imaging positions, SPIRO expects **two sensor events per position**. The first trigger is treated as the early-warning stop, telling the software that the final stop is approaching. The system then slows down, finds the second stop, and only after that applies the configured final offset (*calibration value*). That same final offset is used for all positions, not only the start position. + +Use the *Calibrate motor* view to tune the primary step delay, secondary step delay, secondary stop gap, stable sensor reads, and final offset. The page now also includes manual jog controls, live step counters, and repeatability tests so you can dial in settings without saving every trial. ### Starting an experiment @@ -319,28 +353,54 @@ journalctl --user-unit=spiro ### Testing the LED and motor -To check whether the Raspberry Pi can control the LED illuminator, first set the LED control pin to output mode: +To check whether the Raspberry Pi can control the LED illuminator on Bookworm, use gpiozero instead of the deprecated sysfs GPIO interface: -``` -echo 17 > /sys/class/gpio/export -echo "out" > /sys/class/gpio/gpio17/direction +```python +python3 - <<'PY' +from gpiozero import LED + +led = LED(17) +led.on() +input('LED is on. Press Enter to turn it off...') +led.off() +led.close() +PY ``` -You may then toggle it on or off using the commands +To test the Waveshare HAT on **M2**, make sure the HAT has external motor power connected, the motor is plugged into **M2**, and the DIP switches are left at the factory default microstepping setting. Then run: -``` -echo 1 > /sys/class/gpio/gpio17/value # Turn LED on -``` +```python +python3 - <<'PY' +from time import sleep +from gpiozero import DigitalOutputDevice -``` -echo 0 > /sys/class/gpio/gpio17/value # Turn LED off -``` +enable = DigitalOutputDevice(4, active_high=True, initial_value=False) +direction = DigitalOutputDevice(24, initial_value=False) +step = DigitalOutputDevice(18, initial_value=False) -If it doesn't respond to this command, this may indicate either miswiring, or that either the LED strip or the MOSFET is non-functional. +enable.on() +direction.on() +for _ in range(50): + step.on() + sleep(0.001) + step.off() + sleep(0.02) +enable.off() + +step.close() +direction.close() +enable.close() +PY +``` -Similarly, you can turn on and off the motor, by substituting the value *23* for 17 in the above examples. When GPIO pin 23 is toggled on, the cube should be locked in position. If it is not, check that your wiring looks good, that the power supply is connected, and that the shaft coupler is firmly attached to both the cube and the motor. +If the motor does not move correctly: -If the motor is moving jerkily during normal operation, there is likely a problem with the wiring of the coil pins (Ain1&2 and Bin1&2). +* Confirm the motor is connected to **M2** on the HAT. +* Confirm the HAT's external motor power supply is connected and switched on. +* Confirm the M2 DIP switches `D3-D5` are still at the factory default unless you intentionally recalibrated for another microstepping mode. +* Confirm the motor wiring matches the HAT's `A3/A4/B3/B4` outputs. +* Confirm the DRV8825 current limit is set appropriately for the motor. +* If the stage moves in the wrong direction, set `motor_direction_active_high` in the SPIRO config instead of rewiring the whole system. ## Licensing diff --git a/bin/spiro b/bin/spiro index 7fa9bb2..4983068 100644 --- a/bin/spiro +++ b/bin/spiro @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -# -# small shell script wrapper for spiro +"""Thin executable wrapper that forwards to the packaged SPIRO entry point.""" import spiro.spiro spiro.spiro.main() diff --git a/bin/test_stepper_slow.py b/bin/test_stepper_slow.py new file mode 100644 index 0000000..4286c0e --- /dev/null +++ b/bin/test_stepper_slow.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +""" +bin/test_stepper_slow.py - simple slow stepper test for Waveshare HAT M2. + +Defaults match SPIRO config: ENABLE=4, DIR=24, STEP=18. +Designed for manual testing on the Raspberry Pi. Use --inter-step to slow pulses. +""" + +import argparse +from time import sleep + +try: + from gpiozero import DigitalOutputDevice +except Exception: + # Fallback stub for environments without gpiozero (safe for static checks) + class DigitalOutputDevice: + def __init__(self, *args, **kwargs): + pass + def on(self): + pass + def off(self): + pass + def close(self): + pass + + +def main(): + parser = argparse.ArgumentParser(description="Slow stepper test (Waveshare HAT M2)") + parser.add_argument('--enable-pin', type=int, default=4) + parser.add_argument('--dir-pin', type=int, default=24) + parser.add_argument('--step-pin', type=int, default=18) + parser.add_argument('--steps', type=int, default=200) + parser.add_argument('--step-high', type=float, default=0.005, + help='STEP pulse high time in seconds (>= ~2e-6)') + parser.add_argument('--inter-step', type=float, default=0.1, + help='Delay between steps in seconds') + parser.add_argument('--direction', type=int, default=1, choices=(0, 1)) + parser.add_argument('--enable-active-low', action='store_true', + help='Set if HAT enable is active low (default active high)') + args = parser.parse_args() + + enable_active_high = not args.enable_active_low + + enable = DigitalOutputDevice(args.enable_pin, active_high=enable_active_high, initial_value=False) + direction = DigitalOutputDevice(args.dir_pin, initial_value=False) + step = DigitalOutputDevice(args.step_pin, initial_value=False) + + try: + print(f"Enabling driver (pin {args.enable_pin})") + enable.on() + if args.direction: + direction.on() + else: + direction.off() + + print(f"Stepping {args.steps} pulses (high={args.step_high}s, delay={args.inter_step}s)") + for i in range(args.steps): + step.on() + sleep(args.step_high) + step.off() + sleep(args.inter_step) + except KeyboardInterrupt: + print("Interrupted, disabling driver.") + finally: + enable.off() + try: + step.close() + direction.close() + enable.close() + except Exception: + pass + + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py index 64d7a43..3accc4c 100644 --- a/setup.py +++ b/setup.py @@ -1,10 +1,17 @@ +"""Package metadata and installation configuration for SPIRO. + +The setup script reads version information directly from the local version +helper instead of importing the full package. That keeps installation safe on +systems where Raspberry Pi hardware libraries are unavailable at build time. +""" + from setuptools import setup, find_packages -# from miniver def get_version_and_cmdclass(package_path): - """Load version.py module without importing the whole package. + """Load the version helper without importing the full package tree. - Template code from miniver + SPIRO imports hardware-related modules at runtime, so setup must avoid a + normal package import here. """ import os from importlib.util import module_from_spec, spec_from_file_location @@ -16,7 +23,8 @@ def get_version_and_cmdclass(package_path): version, cmdclass = get_version_and_cmdclass("spiro") -#version = "0.1.0" +# Preserve the current packaging behavior by leaving custom build command +# overrides disabled unless they are intentionally re-enabled later. cmdclass = {} setup(name = 'spiro', @@ -24,7 +32,7 @@ def get_version_and_cmdclass(package_path): cmdclass = cmdclass, packages = find_packages(), scripts = ['bin/spiro'], - install_requires = ['picamera2==0.3.31', 'RPi.GPIO==0.7.1', 'Flask==2.2.5', 'waitress==2.1.2', 'numpy==1.24.2', 'Werkzeug==2.2.3'], + install_requires = ['picamera2==0.3.31', 'gpiozero>=2.0.1,<3', 'Flask==2.2.5', 'waitress==2.1.2', 'numpy==1.24.2', 'Werkzeug==2.2.3'], author = 'Kirby Kalbaugh', author_email = 'kkalbaug@purdue.edu', description = 'Control software for the SPIRO biological imaging system', diff --git a/spiro/__init__.py b/spiro/__init__.py index 7df9f7a..ffab215 100644 --- a/spiro/__init__.py +++ b/spiro/__init__.py @@ -1 +1,7 @@ +"""SPIRO package namespace. + +Only lightweight package metadata is exposed here so callers can query the +installed version without importing the camera or GPIO subsystems. +""" + from ._version import __version__ \ No newline at end of file diff --git a/spiro/_static_version.py b/spiro/_static_version.py index 5557f9b..830c019 100644 --- a/spiro/_static_version.py +++ b/spiro/_static_version.py @@ -1,4 +1,12 @@ # -*- coding: utf-8 -*- +"""Static version markers used by the bundled miniver helper. + +In a live git checkout this file usually keeps the ``__use_git__`` sentinel so +the runtime version can be derived from tags. When a source or wheel +distribution is built, setup rewrites the file with a fixed version string so +the installed package no longer depends on git metadata being present. +""" + # This file is part of 'miniver': https://github.com/jbweston/miniver # # This file will be overwritten by setup.py when a source or binary diff --git a/spiro/_version.py b/spiro/_version.py index eb807f4..3bf4c7e 100644 --- a/spiro/_version.py +++ b/spiro/_version.py @@ -1,4 +1,10 @@ # -*- coding: utf-8 -*- +"""Version resolution helpers vendored from miniver. + +SPIRO uses these utilities to produce a stable package version from git tags in +development and from frozen metadata in release artifacts. +""" + # This file is part of 'miniver': https://github.com/jbweston/miniver # from collections import namedtuple @@ -28,6 +34,11 @@ def get_version(version_file=STATIC_VERSION_FILE): + """Resolve the current package version as a string. + + The lookup order is: static version file, live git metadata, archive + placeholders, and finally an ``unknown`` fallback. + """ version_info = get_static_version_info(version_file) version = version_info["version"] if version == "__use_git__": @@ -42,6 +53,7 @@ def get_version(version_file=STATIC_VERSION_FILE): def get_static_version_info(version_file=STATIC_VERSION_FILE): + """Load the static version helper into an isolated namespace.""" version_info = {} with open(os.path.join(package_root, version_file), "rb") as f: exec(f.read(), {}, version_info) @@ -49,10 +61,12 @@ def get_static_version_info(version_file=STATIC_VERSION_FILE): def version_is_from_git(version_file=STATIC_VERSION_FILE): + """Return whether version resolution is delegated to git metadata.""" return get_static_version_info(version_file)["version"] == "__use_git__" def pep440_format(version_info): + """Convert a ``Version`` tuple into a PEP 440 compliant version string.""" release, dev, labels = version_info version_parts = [release] @@ -70,6 +84,7 @@ def pep440_format(version_info): def get_version_from_git(): + """Derive a version from ``git describe`` when running from a checkout.""" try: p = subprocess.Popen( ["git", "rev-parse", "--show-toplevel"], @@ -145,6 +160,7 @@ def get_version_from_git(): # pointing to, or its hash (with no version info) # if it is not tagged. def get_version_from_git_archive(version_info): + """Derive a version from ``git archive`` placeholders when available.""" try: refnames = version_info["refnames"] git_hash = version_info["git_hash"] @@ -176,6 +192,7 @@ def get_version_from_git_archive(version_info): def _write_version(fname): + """Write the resolved version into the generated static version file.""" # This could be a hard link, so try to delete it first. Is there any way # to do this atomically together with opening? try: @@ -190,12 +207,16 @@ def _write_version(fname): class _build_py(build_py_orig): + """Build hook that injects a fixed static version into build outputs.""" + def run(self): super().run() _write_version(os.path.join(self.build_lib, package_name, STATIC_VERSION_FILE)) class _sdist(sdist_orig): + """sdist hook that writes frozen version metadata into release archives.""" + def make_release_tree(self, base_dir, files): super().make_release_tree(base_dir, files) if _package_root_inside_src: diff --git a/spiro/camera.py b/spiro/camera.py index 8609ec4..8b257ba 100644 --- a/spiro/camera.py +++ b/spiro/camera.py @@ -1,16 +1,30 @@ +"""Camera wrapper used by the rest of the SPIRO application. + +The web UI and experiment runner both depend on a small, stable API for live +streaming, still capture, focus, exposure, and digital zoom. This wrapper hides +the Picamera2/libcamera details and keeps enough state to reapply view controls +after camera mode switches. +""" + import time from spiro.logger import log, debug from spiro.config import Config class NewCamera: + """Thin stateful adapter around Picamera2 for SPIRO's workflows.""" + def __init__(self): debug('Libcamera detected.') self.camera = Picamera2() self.type = 'libcamera' self.streaming = False self.stream_output = None + self.current_zoom = None + self.current_focus = None # self.preview_mode = None + # SPIRO uses a high-resolution still pipeline for saved images and a + # smaller video pipeline for the MJPEG live view. self.still_config = self.camera.create_still_configuration(main={"size": (4656, 3496)}, lores={"size": (320, 240)}, raw = None) self.video_config = self.camera.create_video_configuration(main={"size": (1640,1232)}) # original 1024,768 @@ -33,6 +47,7 @@ def __init__(self): "AfMode": controls.AfModeEnum.Manual, "LensPosition": (self.lens_limits[2] if self.lens_limits and len(self.lens_limits) > 2 else 0) }) + self.current_focus = (self.lens_limits[2] if self.lens_limits and len(self.lens_limits) > 2 else 0) except Exception: # be lenient if controls or enums are not available pass @@ -42,13 +57,49 @@ def __init__(self): except Exception: pass + def _apply_saved_view_controls(self): + """Reapply the last requested crop and focus controls to the camera. + + Picamera2 can lose these controls when switching between video and still + configurations, so the wrapper keeps its own copy instead of assuming + the camera will preserve them internally. + """ + controls_to_apply = {} + if self.current_zoom is not None: + try: + x, y, w, h = self.current_zoom + (resx, resy) = self.camera.camera_properties.get('PixelArraySize', (0, 0)) + controls_to_apply['ScalerCrop'] = (int(x * resx), int(y * resy), int(w * resx), int(h * resy)) + except Exception: + debug('Failed to prepare saved zoom', exc_info=True) + if self.current_focus is not None: + controls_to_apply['LensPosition'] = self.current_focus + + if controls_to_apply: + try: + self.camera.set_controls(controls_to_apply) + except Exception: + debug('Failed to restore saved view controls', exc_info=True) + + def _reapply_view_controls_after_mode_switch(self, settle_seconds=0.08): + """Reapply controls twice after a mode switch to work around racey writes.""" + # Picamera2 can ignore the first control write immediately after switch_mode. + # Apply controls twice with a short settle delay to make crop/focus stick. + self._apply_saved_view_controls() + time.sleep(settle_seconds) + self._apply_saved_view_controls() + def start_stream(self, output): + """Switch into video mode and start the MJPEG live stream.""" log('Starting stream.') try: + if self.streaming: + self.stop_stream() self.stream_output = output - self.streaming = True self.camera.switch_mode(self.video_config) + self._reapply_view_controls_after_mode_switch() self.camera.start_recording(MJPEGEncoder(), FileOutput(output)) + self.streaming = True except Exception: debug('Failed to start stream', exc_info=True) @@ -77,13 +128,19 @@ def start_stream(self, output): def stop_stream(self): - # intentionally a no-op for libcamera in this design - pass + """Stop the MJPEG stream if it is currently active.""" + if not self.streaming: + return + try: + self.camera.stop_recording() + except Exception: + debug('Failed to stop stream', exc_info=True) + finally: + self.streaming = False @property def zoom(self): - # not implemented for libcamera wrapper - return None + return self.current_zoom @zoom.setter def zoom(self, value): @@ -94,30 +151,42 @@ def zoom(self, value): raise ValueError('zoom setter expects a tuple (x, y, w, h)') try: + self.current_zoom = (x, y, w, h) (resx, resy) = self.camera.camera_properties.get('PixelArraySize', (0, 0)) self.camera.set_controls({"ScalerCrop": (int(x * resx), int(y * resy), int(w * resx), int(h * resy))}) except Exception: debug('Failed to set zoom', exc_info=True) def auto_exposure(self, value): + """Enable or disable the camera's automatic exposure loop.""" try: self.camera.set_controls({'AeEnable': value}) except Exception: debug('Failed to set auto_exposure', exc_info=True) def capture(self, obj, format='png'): + """Capture a full-resolution still image into the provided file-like object. + + Live recording is stopped before entering still mode because switching + libcamera pipelines while the MJPEG encoder is active can fail or leave + the encoder in a bad state. + """ stream = self.streaming + output = self.stream_output log('Capturing image.') try: + if stream: + self.stop_stream() self.camera.switch_mode(self.still_config) + self._reapply_view_controls_after_mode_switch() self.camera.capture_file(obj, format=format) log('Ok.') except Exception: debug('Capture failed', exc_info=True) if stream: - self.start_stream(self.stream_output) + self.start_stream(output) @property def shutter_speed(self): @@ -148,20 +217,25 @@ def iso(self, value): debug('Failed to set iso', exc_info=True) def close(self): + """Close the underlying camera device.""" try: self.camera.close() except Exception: pass def still_mode(self): + """Switch to the still-photo configuration and restore crop/focus.""" try: self.camera.switch_mode(self.still_config) + self._reapply_view_controls_after_mode_switch() except Exception: pass def video_mode(self): + """Switch to the live-view configuration and restore crop/focus.""" try: self.camera.switch_mode(self.video_config) + self._reapply_view_controls_after_mode_switch() except Exception: pass @@ -195,7 +269,9 @@ def awb_gains(self, gains): pass def focus(self, val): + """Set manual lens position and remember it for later mode switches.""" try: + self.current_focus = val self.camera.set_controls({'LensPosition': val}) except Exception: debug('Failed to set focus', exc_info=True) @@ -206,4 +282,5 @@ def focus(self, val): from picamera2.encoders import MJPEGEncoder from libcamera import controls +# The rest of the application imports a module-level camera singleton. cam = NewCamera() diff --git a/spiro/config.py b/spiro/config.py index 84c36e6..88658bf 100644 --- a/spiro/config.py +++ b/spiro/config.py @@ -1,6 +1,8 @@ -# config.py - -# handles getting/setting of persistent spiro config values -# +"""Persistent configuration storage for SPIRO. + +Settings are stored in ``~/.config/spiro/spiro.conf`` as JSON so both the web +UI and command-line entry points can share the same durable configuration. +""" import json import os @@ -9,6 +11,7 @@ from ._version import __version__ def log(msg): + """Write config-related warnings directly to stderr.""" sys.stderr.write(msg + '\n') sys.stderr.flush() @@ -31,17 +34,51 @@ def _resolve_version(): class Config(object): + """Load, cache, and persist SPIRO configuration values. + + ``defaults`` holds built-in values for a fresh install. ``config`` contains + only user overrides read from disk. + """ + defaults = { - 'calibration' : 8, # number of steps taken after positional sensor is lit - 'sensor': 4, # pin for mini microswitch positional sensor + 'calibration' : 8, # final offset applied after the second/final trigger at every position + 'motor_step_delay': 0.20, # seconds per SPIRO logical step; expanded into multiple DRV8825 pulses on M2 + # `motor_secondary_step_delay` controls the logical-step delay used during the + # secondary (fine) stop-finding phase of homing. The homing routine first + # performs a coarse search using `motor_step_delay` to move the stage until + # the positional sensor is approached. If `secondary_stop_steps` is enabled + # (non-zero), the controller will then back off and perform a finer-grained + # search to precisely locate the mechanical stop using smaller, slower + # steps. This setting is measured in seconds per logical step (same units as + # `motor_step_delay`). Increasing the value makes the fine search slower + # and more tolerant of sensor bounce or mechanical slop; decreasing it + # makes the fine search faster but reduces tolerance for missed triggers. + # Typical usage: set `motor_secondary_step_delay` slightly larger than + # `motor_step_delay` (for example 1.25×–1.5×) so the fine-pass is calmer + # and more reliable. The default value (0.12) is a practical compromise. + # Note: this parameter is only used when `secondary_stop_steps` is > 0. + 'motor_secondary_step_delay': 0.18, + 'motor_step_pulse_width': 0.001, # STEP high time sent to the DRV8825 input for each microstep pulse + # When using 1/8 microstepping on M2 we emit 4 microstep pulses per + # SPIRO "logical" step (legacy scale). If you change the HAT DIP + # microstepping, set this value to (microsteps_per_full_step / 2) + # to preserve the historical logical-step scale. Default = 4 for 1/8. + 'motor_logical_step_pulses': 4, + 'stop_stable_reads': 2, # consecutive sensor-active reads required before a stop is accepted + 'secondary_stop_settle_time': 0.03, # pause after clearing the coarse trigger before the fine-pass begins + 'position_count': 8, # number of capture positions around the stage + 'secondary_stop_steps': 25, # steps between the coarse and fine stop trigger; set to 0 to disable + 'secondary_home_steps': 25, # backwards-compatible alias for older configs + 'sensor': 12, # pin for mini microswitch positional sensor; BCM4 is used by the Waveshare M2 enable pin + 'sensor_pull_up': False, # SPIRO switch is wired active-high with pull-down by default + 'sensor_bounce_time': 0.025, # debounce time in seconds for the stop switch 'LED': 17, # pin for turning on/off led - 'PWMa': 8, # first pwm pin - 'PWMb': 14, # second pwm pin - 'coilpin_M11': 25, # ain2 pin - 'coilpin_M12': 24, # ain1 pin - 'coilpin_M21': 18, # bin1 pin - 'coilpin_M22': 15, # bin2 pin - 'stdby': 23, # stby pin + 'motor_driver': 'waveshare-stepper-hat-m2', # identifies the supported stage driver implementation + 'motor_dir_pin': 24, # Waveshare Stepper Motor HAT M2 DIR pin + 'motor_step_pin': 18, # Waveshare Stepper Motor HAT M2 STEP pin + 'motor_enable_pin': 4, # Waveshare Stepper Motor HAT M2 ENABLE pin + 'motor_direction_active_high': True, # default forward direction is inverted for the current M2 wiring + 'motor_enable_active_high': True, # Rev2.1 Waveshare HAT enables the driver when the GPIO is high 'focus': 250, # default focus distance 'password': '', # an empty password will trigger password initialization for web ui 'secret': '', # secret key for flask sessions @@ -50,6 +87,7 @@ class Config(object): 'nightshutter': 10, # night exposure time 'nightiso': 400, # night iso 'name': 'spiro', # the name of this spiro instance + 'timezone': 'America/New_York', # user display/save timezone; device clock remains UTC 'debug': True, # debug logging 'rotated_camera': True # rotated camera house } @@ -57,6 +95,7 @@ class Config(object): config = {} def __init__(self): + """Initialize file locations, runtime version, and cached config state.""" self.cfgdir = os.path.expanduser("~/.config/spiro") self.cfgfile = os.path.join(self.cfgdir, "spiro.conf") self.version = _resolve_version() @@ -69,6 +108,7 @@ def __init__(self): def read(self): + """Read the JSON config file from disk if it exists.""" os.makedirs(self.cfgdir, exist_ok=True) if os.path.exists(self.cfgfile): try: @@ -79,6 +119,7 @@ def read(self): def write(self): + """Persist the current config using a temporary file then atomic replace.""" try: with open(self.cfgfile + ".tmp", 'w') as f: json.dump(self.config, f, indent=4) @@ -87,6 +128,7 @@ def write(self): log("Failed to write config file: " + e.strerror) def get(self, key): + """Return a config value, reloading from disk if another process updated it.""" if os.path.exists(self.cfgfile): st = os.stat(self.cfgfile) mt = st.st_mtime @@ -98,11 +140,13 @@ def get(self, key): def set(self, key, value): + """Store a user override and immediately persist it to disk.""" self.config[key] = value self.write() def unset(self, key): + """Remove a persisted override so future reads fall back to defaults.""" if key in self.config: del self.config[key] self.write() diff --git a/spiro/experimenter.py b/spiro/experimenter.py index 9e1f786..5887990 100644 --- a/spiro/experimenter.py +++ b/spiro/experimenter.py @@ -1,6 +1,9 @@ -# experimenter.py - -# this file handles running the actual experiments -# +"""Experiment scheduling and image capture workflow for SPIRO. + +The Experimenter thread owns long-running capture loops, creates output +directories, rotates the imaging stage, and ensures still images use the same +framing rules as the interactive live view. +""" import os import time @@ -8,17 +11,20 @@ import numpy as np from PIL import Image from io import BytesIO -from datetime import date from statistics import mean from collections import deque from spiro.config import Config from spiro.logger import log, debug +from spiro.timeutils import dated_folder_prefix, filename_timestamp class Experimenter(threading.Thread): + """Background worker that executes imaging runs across all plate positions.""" + def __init__(self, hw=None, cam=None): self.hw = hw self.cam = cam self.cfg = Config() + self.position_count = self._get_position_count() self.delay = 60 self.duration = 7 self.dir = os.path.expanduser('~') @@ -31,15 +37,28 @@ def __init__(self, hw=None, cam=None): self.stop_experiment = False self.status_change = threading.Event() self.next_status = '' - self.last_captured = [''] * 4 - self.preview = [''] * 4 + self.last_captured = [''] * self.position_count + self.preview = [''] * self.position_count self.preview_lock = threading.Lock() self.nshots = 0 self.idlepos = 0 threading.Thread.__init__(self) + def _get_position_count(self): + """Return the configured number of capture positions around the stage.""" + return max(1, int(self.cfg.get('position_count') or 8)) + + + def _reset_position_buffers(self): + """Resize preview and last-capture buffers when position count changes.""" + self.position_count = self._get_position_count() + self.last_captured = [''] * self.position_count + self.preview = [''] * self.position_count + + def stop(self): + """Request that the currently running experiment stop at the next safe point.""" self.status = "Stopping" self.next_status = '' self.stop_experiment = True @@ -48,7 +67,7 @@ def stop(self): def getDefName(self): '''returns a default experiment name''' - today = date.today().strftime('%Y.%m.%d') + today = dated_folder_prefix(self.cfg.get('timezone')) return today + ' ' + self.cfg.get('name') @@ -69,12 +88,20 @@ def isDaytime(self): # XXX: clean this up self.cam.shutter_speed = 1000000 // self.cfg.get('dayshutter') time.sleep(1.0) - output = self.cam.camera.capture_array('lores') + try: + output = self.cam.camera.capture_array('lores') + except RuntimeError as e: + debug(f"Falling back to main stream for daytime estimation: {e}") + try: + output = self.cam.camera.capture_array('main') + except RuntimeError: + output = self.cam.camera.capture_array() debug("Daytime estimation mean value: " + str(output.mean())) return output.mean() > 60 def setWB(self): + """Let auto white balance settle, then freeze the chosen gains.""" debug("Determining white balance.") self.cam.awb_mode = "auto" time.sleep(2) @@ -83,67 +110,171 @@ def setWB(self): self.cam.awb_gains = g - def takePicture(self, name, plate_no, output = None): - filename = "" + def _apply_view_crop(self, im, view_state, capture_tag='still'): + """Apply the live-view zoom/pan framing to a full-resolution still image. + + libcamera can drop hardware crop state during a mode change, so SPIRO + applies one deterministic software crop here to keep preview and saved + stills aligned with the operator's live view. + """ + if not view_state: + debug(f"[{capture_tag}] crop skipped: no view_state") + return im + + zoom_state = view_state.get('zoom') or {} + try: + # Match live-view crop semantics from ZoomObject.apply(), which maps + # left/top as (y, x) for camera zoom control. + cx = float(zoom_state.get('y', 0.5)) + cy = float(zoom_state.get('x', 0.5)) + roi = float(zoom_state.get('roi', 1.0)) + except Exception: + debug(f"[{capture_tag}] crop skipped: invalid zoom state {zoom_state}") + return im + + requested = {'x': cx, 'y': cy, 'roi': roi} + roi = max(0.2, min(1.0, roi)) + cx = max(roi / 2.0, min(1.0 - roi / 2.0, cx)) + cy = max(roi / 2.0, min(1.0 - roi / 2.0, cy)) + + width, height = im.size + crop_w = max(1, int(round(width * roi))) + crop_h = max(1, int(round(height * roi))) + + left = int(round((cx - roi / 2.0) * width)) + top = int(round((cy - roi / 2.0) * height)) + left = max(0, min(width - crop_w, left)) + top = max(0, min(height - crop_h, top)) + + focus_value = view_state.get('focus') if isinstance(view_state, dict) else None + debug( + f"[{capture_tag}] crop calc " + f"req(x={requested['x']:.6f}, y={requested['y']:.6f}, roi={requested['roi']:.6f}) " + f"norm(x={cx:.6f}, y={cy:.6f}, roi={roi:.6f}) " + f"img={width}x{height} box=({left},{top})-({left + crop_w},{top + crop_h}) " + f"size={crop_w}x{crop_h} focus={focus_value}" + ) + + # Crop first, then resize back to the original output dimensions so the + # saved image keeps the expected resolution while matching live framing. + cropped = im.crop((left, top, left + crop_w, top + crop_h)) + if crop_w == width and crop_h == height: + debug(f"[{capture_tag}] crop result: full-frame (no resize)") + return im + + resampling = getattr(getattr(Image, 'Resampling', Image), 'LANCZOS', Image.LANCZOS) + return cropped.resize((width, height), resample=resampling) + + + def capture_processed_still(self, view_state=None, capture_tag='still'): + """Capture a still image with experiment settings and optional live framing. + + The method decides day vs night exposure, optionally restores the live + focus value, captures a full-resolution frame, applies the deterministic + crop, and finally rotates the image when the camera housing is mounted + sideways. + """ stream = BytesIO() prev_daytime = self.daytime self.daytime = self.isDaytime() - + + if view_state: + try: + focus_value = view_state.get('focus') + if focus_value is not None: + self.hw.focusCam(int(focus_value)) + except Exception as e: + debug(f"Failed to apply live view state to still capture: {e}") + if self.daytime: + # Day images use ambient light and the daytime exposure profile. time.sleep(0.5) self.cam.shutter_speed = 1000000 // self.cfg.get('dayshutter') self.cam.iso = self.cfg.get('dayiso') self.cam.color_effects = None - filename = os.path.join(self.dir, name + "-day.png") else: - # turn on led + # Night images turn on the LED and use the longer exposure profile. self.hw.LEDControl(True) time.sleep(1.0) self.cam.shutter_speed = 1000000 // self.cfg.get('nightshutter') self.cam.iso = self.cfg.get('nightiso') - filename = os.path.join(self.dir, name + "-night.png") - + if prev_daytime != self.daytime and self.daytime and self.cam.awb_mode != "off": - # if there is a daytime shift, AND it is daytime, AND white balance was not previously set, - # set the white balance to a fixed value. - # thus, white balance will only be fixed for the first occurence of daylight. self.setWB() - debug("Capturing %s." % filename) self.cam.exposure_mode = "off" if self.cam.type == 'legacy': self.cam.capture(stream, format='rgb') + if self.cam.resolution[0] == 3280: + raw_res = (3296, 2464) + elif self.cam.resolution[0] == 2592: + raw_res = (2592, 1952) + else: + debug('Camera has unsupported resolution ' + str(self.cam.resolution) + '! This may lead to crashes or corrupted images.') + raw_res = tuple(self.cam.resolution) + stream.seek(0) + im = Image.frombytes('RGB', raw_res, stream.read()).crop(box=(0,0)+self.cam.resolution) else: - # XXX - stream.write(self.cam.camera.capture_array('main')) + # Capture full-frame stills and apply one deterministic software crop + # below. This avoids still-mode crop drift across libcamera + # reconfiguration and makes preview/test logs reproducible. + original_zoom = getattr(self.cam, 'current_zoom', None) + try: + if hasattr(self.cam, 'current_zoom'): + self.cam.current_zoom = None + self.cam.capture(stream, format='png') + finally: + if hasattr(self.cam, 'current_zoom'): + self.cam.current_zoom = original_zoom + stream.seek(0) + im = Image.open(stream).convert('RGB') + + im = self._apply_view_crop(im, view_state, capture_tag=capture_tag) - # turn off LED immediately after capture if not self.daytime: self.hw.LEDControl(False) - # convert to PNG using PIL - # saving as 'RGB' using picamera adds a border which needs to be cropped away - # the raw capture size depends on the type of camera used - # we only support 5 MP (OV5647) and 8 MP (IMX219) cameras, for now at least - if self.cam.resolution[0] == 3280: - raw_res = (3296, 2464) - elif self.cam.resolution[0] == 2592: - raw_res = (2592, 1952) - else: - # unsupported resolution, try to make the best of it - debug('Camera has unsupported resolution ' + str(self.cam.resolution) + '! This may lead to crashes or corrupted images.') - raw_res = tuple(self.cam.resolution) - stream.seek(0) - im = Image.frombytes('RGB', raw_res, stream.read()).crop(box=(0,0)+self.cam.resolution) - - # if configured, rotate saved stills clockwise 90 degrees (do not affect camera config) try: if self.cfg.get('rotated_camera'): - im = im.transpose(Image.ROTATE_270) # clockwise 90° + im = im.transpose(Image.ROTATE_270) debug('Rotated still image 90° clockwise before saving.') except Exception as e: debug(f'Failed to rotate still image: {e}') + finally: + self.cam.color_effects = None + self.cam.shutter_speed = 0 + self.cam.exposure_mode = "auto" + + return im, self.daytime + + + def takePicture(self, name, plate_no, output = None): + """Capture, save, and thumbnail one plate image for the current cycle.""" + filename = "" + view_state = None + if hasattr(self, 'custom_settings'): + # custom_settings stores the camera tuple used by the live-view code. + # Convert it back into the normalized center/roi structure expected + # by the still-capture crop helper. + view_state = { + 'zoom': { + 'x': self.custom_settings['initial_zoom'][1] + self.custom_settings['initial_zoom'][2] / 2.0, + 'y': self.custom_settings['initial_zoom'][0] + self.custom_settings['initial_zoom'][2] / 2.0, + 'roi': self.custom_settings['initial_zoom'][2], + } + } + if 'initial_focus' in self.custom_settings: + view_state['focus'] = self.custom_settings['initial_focus'] + tag = f"experiment-plate{plate_no + 1}" + im, daytime = self.capture_processed_still(view_state=view_state, capture_tag=tag) + + if daytime: + filename = os.path.join(self.dir, name + "-day.png") + else: + filename = os.path.join(self.dir, name + "-night.png") + + debug("Capturing %s." % filename) im.save(filename) @@ -156,10 +287,6 @@ def takePicture(self, name, plate_no, output = None): im.close() self.last_captured[plate_no] = filename - self.cam.color_effects = None - self.cam.shutter_speed = 0 - # we leave the cam in auto exposure mode to improve daytime assessment performance - self.cam.exposure_mode = "auto" def run(self): @@ -189,15 +316,15 @@ def runExperiment(self): self.running = True ######################## if hasattr(self, 'custom_settings'): -# if 'initial_focus' in self.custom_settings: -# try: -# focus_val = self.custom_settings['initial_focus'] -# self.cam.focus(0) -# time.sleep(0.5) -# self.cam.focus(focus_val) -# self.cam.focus(focus_val) -# except Exception as e: -# debug(f"Failed to restore focus: {e}") + # Restore the operator-selected focus and zoom before the first + # capture so automated runs match the framing seen in the live UI. + if 'initial_focus' in self.custom_settings: + try: + focus_val = self.custom_settings['initial_focus'] + self.hw.focusCam(int(focus_val)) + debug(f"Restored focus to: {focus_val}") + except Exception as e: + debug(f"Failed to restore focus: {e}") if 'initial_zoom' in self.custom_settings: try: zoom_val = self.custom_settings['initial_zoom'] @@ -209,7 +336,7 @@ def runExperiment(self): self.status = "Initiating" self.starttime = time.time() self.endtime = time.time() + 60 * 60 * 24 * self.duration - self.last_captured = [''] * 4 + self._reset_position_buffers() self.delay = self.delay or 0.001 self.nshots = self.duration * 24 * 60 // self.delay self.cam.exposure_mode = "auto" @@ -220,7 +347,7 @@ def runExperiment(self): # make sure we don't write directly to home dir self.dir = os.path.join(os.path.expanduser('~'), self.getDefName()) - for i in range(4): + for i in range(self.position_count): platedir = "plate" + str(i + 1) os.makedirs(os.path.join(self.dir, platedir), exist_ok=True) @@ -231,39 +358,56 @@ def runExperiment(self): if nextloop > self.endtime: nextloop = self.endtime - for i in range(4): - # rotate stage to starting position + self.hw.motorOn(True) + for i in range(self.position_count): if i == 0: - self.hw.motorOn(True) self.status = "Finding start position" debug("Finding initial position.") - self.hw.findStart(calibration=self.cfg.get('calibration')) + if not self.hw.findStart(calibration=self.cfg.get('calibration')): + self.stop_experiment = True + break debug("Found initial position.") - if self.status != "Stopping": self.status = "Imaging" else: - # rotate cube 90 degrees - debug("Rotating stage.") - self.hw.halfStep(100, 0.03) + self.status = "Finding next position" + debug("Finding next capture position.") + if not self.hw.findNextStop(): + self.stop_experiment = True + break + + if self.status != "Stopping": + self.status = "Imaging" # wait for the cube to stabilize time.sleep(1.5) - now = time.strftime("%Y%m%d-%H%M%S", time.localtime()) + now = filename_timestamp(self.cfg.get('timezone')) name = os.path.join("plate" + str(i + 1), "plate" + str(i + 1) + "-" + now) self.takePicture(name, i) + if self.stop_experiment: + self.hw.motorOn(False) + break + self.nshots -= 1 self.hw.motorOn(False) if self.status != "Stopping": self.status = "Waiting" if self.idlepos > 0: - # alternate between resting positions during idle, stepping 45 degrees per image + # Spread resting time across positions so the stage does not + # always sit at the same stop between capture rounds. + # alternate between resting positions during idle using sensed stops self.hw.motorOn(True) - self.hw.halfStep(50 * self.idlepos, 0.03) + for i in range(self.idlepos): + if not self.hw.findNextStop(): + self.stop_experiment = True + break self.hw.motorOn(False) + if self.stop_experiment: + break + self.idlepos += 1 - if self.idlepos > 7: + if self.idlepos >= self.position_count: self.idlepos = 0 while time.time() < nextloop and not self.stop_experiment: diff --git a/spiro/failsafe.py b/spiro/failsafe.py index 5c64d23..c289964 100644 --- a/spiro/failsafe.py +++ b/spiro/failsafe.py @@ -1,6 +1,8 @@ -# failsafe.py - -# start debug web interface if main interface fails -# +"""Fallback web interface shown when the main UI fails to start. + +The failsafe app exposes the traceback and recent journal logs so a broken +deployment can still be diagnosed remotely from the browser. +""" import signal import traceback @@ -22,24 +24,28 @@ @app.route('/') def index(): + """Show the captured startup traceback in a lightweight diagnostic page.""" return render_template('failsafe_index.html', trace=''.join(traceback.format_tb(err.__traceback__)), version=cfg.version) @app.route('/log') def get_log(): + """Stream recent service logs for browser-based debugging.""" p = subprocess.Popen(['/bin/journalctl', '--user-unit=spiro', '-n', '1000'], stdout=subprocess.PIPE) return Response(stream_popen(p), mimetype='text/plain') @app.route('/shutdown') def shutdown(): + """Shut the device down from the failsafe UI.""" subprocess.run(['sudo', 'shutdown', '-h', 'now']) return render_template('shutdown.html') @app.route('/reboot') def reboot(): + """Reboot the device from the failsafe UI.""" global restarting restarting = True subprocess.Popen(['sudo', 'shutdown', '-r', 'now']) @@ -48,11 +54,13 @@ def reboot(): @app.route('/exit') def exit(): + """Ask the process manager to restart the main web UI process.""" signal.alarm(1) return render_template('restarting.html', refresh=10, message="Restarting web UI...") def start(e=None): + """Start the fallback UI and retain the triggering exception for display.""" global err err = e serve(app, listen="*:8080", threads=2, channel_timeout=20) diff --git a/spiro/hostapd.py b/spiro/hostapd.py index 3e9cc13..7ae65a9 100644 --- a/spiro/hostapd.py +++ b/spiro/hostapd.py @@ -1,7 +1,9 @@ -# hostapd.py - -# configure system as an AP directing all client queries to the web ui -# uses NetworkManager (nmcli) on Ubuntu Bookworm -# +"""Manage the optional SPIRO Wi-Fi hotspot using NetworkManager. + +This module configures a captive-portal-style access point that routes clients +to the local web UI without requiring the older standalone hostapd/dnsmasq +service stack. +""" import os import re @@ -197,6 +199,7 @@ def config_dnsmasq(ap_ip): def start_ap(): + """Create, configure, and activate the SPIRO hotspot profile.""" log("Setting up dependencies...") ok, err = install_reqs() if not ok: @@ -220,7 +223,8 @@ def start_ap(): _run_privileged(["nmcli", "connection", "modify", _CONNECTION_NAME, "connection.autoconnect", "no"]) - # Recreate profile each time with fallback subnets to avoid stale/bad state. + # Recreate the profile on each start so stale NetworkManager state or a bad + # subnet choice does not permanently wedge hotspot activation. activation_attempts = list(_AP_IP_CANDIDATES) started = False last_ssid = None @@ -286,6 +290,7 @@ def start_ap(): def stop_ap(): + """Deactivate the SPIRO hotspot without deleting the saved profile.""" log("Disabling hotspot...") _run_privileged_passthrough(["nmcli", "connection", "modify", _CONNECTION_NAME, "connection.autoconnect", "no"]) diff --git a/spiro/hwcontrol.py b/spiro/hwcontrol.py index f64f94f..52160dd 100644 --- a/spiro/hwcontrol.py +++ b/spiro/hwcontrol.py @@ -1,133 +1,664 @@ -# hwcontrol.py - -# various functions for controlling the spiro hardware -# +"""Low-level hardware control for the SPIRO platform. -import RPi.GPIO as gpio -import time +This revision targets the Waveshare Stepper Motor HAT on channel M2, which uses +the DRV8825 step/direction interface instead of directly switching the motor +coils from the Raspberry Pi. GPIO access is handled through gpiozero so the code +can use the lgpio backend recommended for Raspberry Pi OS Bookworm. + +For systems that need more torque, we recommend setting the M2 DIP switches to +1/8 microstepping (D3=ON, D4=ON, D5=OFF). In that mode SPIRO emits multiple +STEP pulses per logical software step; the default configuration in this +release maps one logical step to 4 microstep pulses (1/8 -> 8/2 = 4). +""" + +import copy import os +import time + +from gpiozero import Button, Device, DigitalOutputDevice, LED + +try: + from gpiozero.pins.lgpio import LGPIOFactory +except Exception: + LGPIOFactory = None + from spiro.camera import cam from spiro.config import Config from spiro.logger import log, debug + +if LGPIOFactory is not None: + try: + Device.pin_factory = LGPIOFactory() + except Exception: + # gpiozero will fall back to its default pin factory if lgpio is not + # available in the current environment. + pass + + class HWControl: + """Hardware abstraction layer for motor, LED, switch, and focus control.""" + def __init__(self): - gpio.setmode(gpio.BCM) self.cfg = Config() self.pins = { - 'LED' : self.cfg.get('LED'), - 'sensor' : self.cfg.get('sensor'), - 'PWMa' : self.cfg.get('PWMa'), - 'PWMb' : self.cfg.get('PWMb'), - 'coilpin_M11' : self.cfg.get('coilpin_M11'), - 'coilpin_M12' : self.cfg.get('coilpin_M12'), - 'coilpin_M21' : self.cfg.get('coilpin_M21'), - 'coilpin_M22' : self.cfg.get('coilpin_M22'), - 'stdby' : self.cfg.get('stdby') + 'LED': self.cfg.get('LED'), + 'sensor': self.cfg.get('sensor'), + 'motor_dir': self.cfg.get('motor_dir_pin'), + 'motor_step': self.cfg.get('motor_step_pin'), + 'motor_enable': self.cfg.get('motor_enable_pin'), } self.led = False - # defer actual GPIO setup into a separate robust initializer + self.motor = False + self.sensor_active = False + self.sensor_last_trip_time = 0.0 + self.sensor_trip_count = 0 + self.sensor_event_enabled = False + self.forward_step_count = 0 + self.reverse_step_count = 0 + self.last_stop_report = None + self.last_repeatability_test = None + self.seqNumb = 0 + + self.led_device = None + self.sensor_device = None + self.motor_dir_device = None + self.motor_step_device = None + self.motor_enable_device = None + + # GPIO setup remains explicit and idempotent because the application can + # call GPIOInit more than once during startup or restart paths. self.GPIOInit() + def _motion_setting(self, key, overrides=None, alias=None): + """Return a motion-tuning value, preferring temporary overrides.""" + if overrides and overrides.get(key) not in (None, ''): + return overrides.get(key) + if alias and overrides and overrides.get(alias) not in (None, ''): + return overrides.get(alias) + + value = self.cfg.get(key) + if value is None and alias: + value = self.cfg.get(alias) + return value + + def _step_delay(self, overrides=None): + """Return the configured coarse homing delay between logical steps.""" + value = self._motion_setting('motor_step_delay', overrides=overrides) + try: + return max(0.01, float(value or 0.05)) + except Exception: + return 0.05 + + def _step_pulse_width(self, overrides=None): + """Return the HIGH time of one pulse on the DRV8825 STEP input.""" + value = self._motion_setting('motor_step_pulse_width', overrides=overrides) + try: + return max(0.0001, float(value or 0.001)) + except Exception: + return 0.001 + + def _logical_step_pulses(self, overrides=None): + """Return how many STEP pulses make up one SPIRO logical step. + + The old TB6612-style driver advanced one legacy logical step per + electrical transition. With the Waveshare HAT set to 1/8 + microstepping (recommended for higher torque), matching that old + scale requires 4 pulses per logical step (microsteps_per_full_step/2). + """ + value = self._motion_setting('motor_logical_step_pulses', overrides=overrides) + try: + return max(1, int(value or 16)) + except Exception: + return 16 + + def _stop_stable_reads(self, overrides=None): + """Return how many consecutive active reads are required to accept a stop.""" + value = self._motion_setting('stop_stable_reads', overrides=overrides) + try: + return max(1, int(value or 2)) + except Exception: + return 2 + + def _secondary_settle_time(self, overrides=None): + """Return the dwell time between coarse trigger clear and fine search.""" + value = self._motion_setting('secondary_stop_settle_time', overrides=overrides) + try: + return max(0.0, float(value or 0.03)) + except Exception: + return 0.03 + + def _sensor_bounce_time(self): + """Return the debounce time used by gpiozero for the sensor input.""" + value = self.cfg.get('sensor_bounce_time') + try: + return max(0.0, float(value or 0.025)) + except Exception: + return 0.025 + + def _sensor_event(self, active): + """Track switch transitions for the UI and diagnostics.""" + self.sensor_active = bool(active) + if active: + self.sensor_last_trip_time = time.time() + self.sensor_trip_count += 1 + def _valid_pin(self, pin): + """Return whether a config value looks like a usable BCM pin number.""" return pin is not None and pin != '' and isinstance(pin, int) + def _close_devices(self): + """Release any gpiozero devices currently owned by the controller.""" + for attr in ('sensor_device', 'led_device', 'motor_dir_device', 'motor_step_device', 'motor_enable_device'): + device = getattr(self, attr, None) + if device is None: + continue + try: + device.close() + except Exception: + pass + setattr(self, attr, None) + self.sensor_event_enabled = False + def GPIOInit(self): - gpio.setwarnings(False) + """Configure GPIO devices for the Waveshare HAT, LED, and stop switch.""" + self._close_devices() - # Only setup pins that are valid (protect against missing config values) if self._valid_pin(self.pins.get('LED')): - gpio.setup(self.pins['LED'], gpio.OUT) + self.led_device = LED(self.pins['LED'], initial_value=False) + if self._valid_pin(self.pins.get('sensor')): - gpio.setup(self.pins['sensor'], gpio.IN, pull_up_down=gpio.PUD_DOWN) - if self._valid_pin(self.pins.get('PWMa')): - gpio.setup(self.pins['PWMa'], gpio.OUT) - gpio.output(self.pins['PWMa'], True) - if self._valid_pin(self.pins.get('PWMb')): - gpio.setup(self.pins['PWMb'], gpio.OUT) - gpio.output(self.pins['PWMb'], True) - if self._valid_pin(self.pins.get('coilpin_M11')): - gpio.setup(self.pins['coilpin_M11'], gpio.OUT) - if self._valid_pin(self.pins.get('coilpin_M12')): - gpio.setup(self.pins['coilpin_M12'], gpio.OUT) - if self._valid_pin(self.pins.get('coilpin_M21')): - gpio.setup(self.pins['coilpin_M21'], gpio.OUT) - if self._valid_pin(self.pins.get('coilpin_M22')): - gpio.setup(self.pins['coilpin_M22'], gpio.OUT) - if self._valid_pin(self.pins.get('stdby')): - gpio.setup(self.pins['stdby'], gpio.OUT) - - # Initialize default outputs only if pins are valid - if self._valid_pin(self.pins.get('PWMa')): - gpio.output(self.pins['PWMa'], True) - if self._valid_pin(self.pins.get('PWMb')): - gpio.output(self.pins['PWMb'], True) - - # Ensure LED and motor are in known state (will be no-op if pin invalid) + try: + sensor_bounce = self._sensor_bounce_time() + self.sensor_device = Button( + self.pins['sensor'], + pull_up=bool(self.cfg.get('sensor_pull_up')), + bounce_time=(sensor_bounce if sensor_bounce > 0 else None), + ) + self.sensor_device.when_pressed = lambda: self._sensor_event(True) + self.sensor_device.when_released = lambda: self._sensor_event(False) + self.sensor_event_enabled = True + except Exception as e: + self.sensor_device = None + self.sensor_event_enabled = False + log('Warning: sensor event registration unavailable; using polling fallback. ' + str(e)) + + if self._valid_pin(self.pins.get('motor_dir')): + self.motor_dir_device = DigitalOutputDevice( + self.pins['motor_dir'], + active_high=bool(self.cfg.get('motor_direction_active_high')), + initial_value=False, + ) + if self._valid_pin(self.pins.get('motor_step')): + self.motor_step_device = DigitalOutputDevice(self.pins['motor_step'], initial_value=False) + if self._valid_pin(self.pins.get('motor_enable')): + self.motor_enable_device = DigitalOutputDevice( + self.pins['motor_enable'], + active_high=bool(self.cfg.get('motor_enable_active_high')), + initial_value=False, + ) + self.LEDControl(False) self.motorOn(False) + self.sensor_active = self._sensor_active() def cleanup(self): - gpio.cleanup() + """Release all GPIO resources held by the process.""" + self._close_devices() + + def _sensor_active(self): + """Return the current raw state of the positional switch.""" + if self.sensor_device is None: + return False + return bool(self.sensor_device.is_pressed) + + def _secondary_stop_steps(self, overrides=None): + """Return the configured number of fine-search steps after coarse homing.""" + value = self._motion_setting('secondary_stop_steps', overrides=overrides) + if value is None: + value = self._motion_setting('secondary_home_steps', overrides=overrides) + return max(0, int(value or 0)) + + def _secondary_step_delay(self, overrides=None): + """Return the delay used during the fine stop-finding pass.""" + value = self._motion_setting('motor_secondary_step_delay', overrides=overrides) + if value is None: + return max(self._step_delay(overrides) * 2.5, self._step_delay(overrides) + 0.04) + try: + return max(0.02, float(value)) + except Exception: + return max(self._step_delay(overrides) * 2.5, self._step_delay(overrides) + 0.04) + + def _begin_stop_report(self, label, calibration, timeout, ignore_initial_retrigger_steps, + first_stop_delay, second_stop_delay, secondary_stop_steps, + stable_reads, settle_time): + """Create a diagnostics record for a stop-finding attempt.""" + return { + 'label': label, + 'started_at': time.time(), + 'timeout_seconds': float(timeout), + 'calibration': int(calibration or 0), + 'ignore_initial_retrigger_steps': max(0, int(ignore_initial_retrigger_steps or 0)), + 'primary_step_delay': first_stop_delay, + 'secondary_step_delay': second_stop_delay, + 'secondary_stop_steps': secondary_stop_steps, + 'stop_stable_reads': stable_reads, + 'secondary_stop_settle_time': settle_time, + 'logical_step_pulses': self._logical_step_pulses(), + 'initial_sensor_active': self._sensor_active(), + 'initial_clear_steps': 0, + 'coarse_steps_to_trigger': 0, + 'ignored_trigger_count': 0, + 'ignored_trigger_clear_steps': 0, + 'secondary_clear_steps': 0, + 'secondary_approach_steps': 0, + 'secondary_search_steps_to_trigger': 0, + 'final_adjustment_direction': 'none', + 'final_adjustment_steps': 0, + 'sequence_index_before': self.seqNumb, + '_forward_before': self.forward_step_count, + '_reverse_before': self.reverse_step_count, + } + + def _finalize_stop_report(self, report, success, failure_reason=''): + """Complete and store a stop-finding diagnostics record.""" + forward_before = report.pop('_forward_before', self.forward_step_count) + reverse_before = report.pop('_reverse_before', self.reverse_step_count) + report['completed_at'] = time.time() + report['duration_seconds'] = round(report['completed_at'] - report['started_at'], 4) + report['forward_steps_moved'] = self.forward_step_count - forward_before + report['reverse_steps_moved'] = self.reverse_step_count - reverse_before + report['net_steps_moved'] = report['forward_steps_moved'] - report['reverse_steps_moved'] + report['sequence_index_after'] = self.seqNumb + report['sensor_active_end'] = self._sensor_active() + report['success'] = bool(success) + report['failure_reason'] = failure_reason or '' + self.last_stop_report = report + return bool(success) + + def _clear_sensor(self, deadline, delay, return_steps=False): + """Step forward until the positional switch is no longer active.""" + steps_taken = 0 + while self._sensor_active() and time.time() < deadline: + self.halfStep(1, delay) + steps_taken += 1 + result = not self._sensor_active() + if return_steps: + return result, steps_taken + return result + + def _step_until_sensor(self, expected_state, deadline, delay, stable_reads=1, return_steps=False): + """Step until the sensor reaches the requested state for enough reads.""" + matched = 0 + steps_taken = 0 + while self._valid_pin(self.pins.get('sensor')) and time.time() < deadline: + if self._sensor_active() == expected_state: + matched += 1 + if matched >= max(1, stable_reads): + if return_steps: + return True, steps_taken + return True + time.sleep(0.005) + continue + matched = 0 + self.halfStep(1, delay) + steps_taken += 1 + result = self._sensor_active() == expected_state + if return_steps: + return result, steps_taken + return result + + def _find_stop(self, calibration=0, timeout=60, label='stop position', + ignore_initial_retrigger_steps=0, overrides=None): + """Find a sensed stop using coarse motion and an optional fine pass.""" + first_stop_delay = self._step_delay(overrides=overrides) + second_stop_delay = self._secondary_step_delay(overrides=overrides) + deadline = time.time() + timeout + secondary_stop_steps = self._secondary_stop_steps(overrides=overrides) + stable_reads = self._stop_stable_reads(overrides=overrides) + settle_time = self._secondary_settle_time(overrides=overrides) + report = self._begin_stop_report( + label=label, + calibration=calibration, + timeout=timeout, + ignore_initial_retrigger_steps=ignore_initial_retrigger_steps, + first_stop_delay=first_stop_delay, + second_stop_delay=second_stop_delay, + secondary_stop_steps=secondary_stop_steps, + stable_reads=stable_reads, + settle_time=settle_time, + ) + + auto_enabled = False + if not self.motor: + # Stop-finding frequently advances one logical step at a time. + # Holding the driver enabled across the whole search avoids a hard + # enable/disable lurch on every step. + self.motorOn(True) + auto_enabled = True + time.sleep(0.005) + + try: + if not self._valid_pin(self.pins.get('sensor')) or self.sensor_device is None: + return self._finalize_stop_report(report, False, 'sensor-unavailable') + + cleared, initial_clear_steps = self._clear_sensor(deadline, first_stop_delay, return_steps=True) + report['initial_clear_steps'] = initial_clear_steps + if not cleared: + log(f"Timed out while clearing {label}! Images will be misaligned.") + return self._finalize_stop_report(report, False, 'timeout-clearing-sensor') - def findStart(self, calibration=None): - """rotates the imaging stage until the positional switch is activated""" - calibration = calibration or self.cfg.get('calibration') - timeout = 60 - starttime = time.time() + while True: + found_stop, steps_to_stop = self._step_until_sensor( + True, + deadline, + first_stop_delay, + stable_reads=stable_reads, + return_steps=True, + ) + report['coarse_steps_to_trigger'] += steps_to_stop + if not found_stop: + log(f"Timed out while finding {label}! Images will be misaligned.") + return self._finalize_stop_report(report, False, 'timeout-finding-coarse-trigger') + if steps_to_stop >= max(0, int(ignore_initial_retrigger_steps or 0)): + break + report['ignored_trigger_count'] += 1 + debug( + f"Ignoring early {label} trigger after {steps_to_stop} steps; " + f"waiting for next stop beyond {ignore_initial_retrigger_steps} steps." + ) + cleared, clear_steps = self._clear_sensor(deadline, first_stop_delay, return_steps=True) + report['ignored_trigger_clear_steps'] += clear_steps + if not cleared: + log(f"Timed out while clearing {label}! Images will be misaligned.") + return self._finalize_stop_report(report, False, 'timeout-clearing-ignored-trigger') - # make sure that switch is not depressed when starting - if self._valid_pin(self.pins.get('sensor')) and gpio.input(self.pins['sensor']): - while gpio.input(self.pins['sensor']) and time.time() < starttime + timeout: - self.halfStep(1, 0.03) + if secondary_stop_steps > 0: + # The first trigger announces that a final trigger is approaching. + # Clear the first trigger, wait briefly for the switch to settle, + # then approach the second trigger at a slower rate. + time.sleep(settle_time) + cleared, secondary_clear_steps = self._clear_sensor(deadline, first_stop_delay, return_steps=True) + report['secondary_clear_steps'] = secondary_clear_steps + if not cleared: + log(f"Timed out while clearing {label}! Images will be misaligned.") + return self._finalize_stop_report(report, False, 'timeout-clearing-secondary-trigger') - while self._valid_pin(self.pins.get('sensor')) and not gpio.input(self.pins['sensor']) and time.time() < starttime + timeout: - self.halfStep(1, 0.03) + for _ in range(secondary_stop_steps): + if time.time() >= deadline: + log(f"Timed out while finding {label}! Images will be misaligned.") + return self._finalize_stop_report(report, False, 'timeout-during-secondary-approach') + if self._sensor_active(): + break + self.halfStep(1, second_stop_delay) + report['secondary_approach_steps'] += 1 - if time.time() < starttime + timeout: - self.halfStep(calibration, 0.03) + found_secondary, secondary_search_steps = self._step_until_sensor( + True, + deadline, + second_stop_delay, + stable_reads=stable_reads, + return_steps=True, + ) + report['secondary_search_steps_to_trigger'] = secondary_search_steps + if not found_secondary: + log(f"Timed out while finding {label}! Images will be misaligned.") + return self._finalize_stop_report(report, False, 'timeout-finding-secondary-trigger') + + if calibration: + if calibration < 0: + report['final_adjustment_direction'] = 'reverse' + report['final_adjustment_steps'] = abs(calibration) + self.halfStepReverse(abs(calibration), first_stop_delay) + else: + report['final_adjustment_direction'] = 'forward' + report['final_adjustment_steps'] = int(calibration) + self.halfStep(calibration, first_stop_delay) + return self._finalize_stop_report(report, True) + finally: + if auto_enabled: + self.motorOn(False) + + def findStart(self, calibration=None, overrides=None): + """Find the start position and then apply the shared final offset.""" + if calibration is None: + calibration = self.cfg.get('calibration') + return self._find_stop(calibration=calibration, label='start position', overrides=overrides) + + def findNextStop(self, calibration=None, overrides=None): + """Find the next position and apply the same final offset used for start.""" + calibration = int((self.cfg.get('calibration') if calibration is None else calibration) or 0) + ignore_initial_retrigger_steps = 0 + if calibration < 0: + ignore_initial_retrigger_steps = abs(calibration) + 2 + return self._find_stop( + calibration=calibration, + label='next stop position', + ignore_initial_retrigger_steps=ignore_initial_retrigger_steps, + overrides=overrides, + ) + + def get_step_counters(self): + """Return calibration-oriented motion counters for the current session.""" + return { + 'forward_steps': self.forward_step_count, + 'reverse_steps': self.reverse_step_count, + 'net_steps': self.forward_step_count - self.reverse_step_count, + 'sequence_index': self.seqNumb, + } + + def reset_calibration_counters(self): + """Clear session counters and the most recent calibration reports.""" + self.forward_step_count = 0 + self.reverse_step_count = 0 + self.sensor_trip_count = 0 + self.sensor_last_trip_time = 0.0 + self.last_stop_report = None + self.last_repeatability_test = None + + def get_last_stop_report(self): + """Return the most recent stop-finding diagnostics report.""" + return copy.deepcopy(self.last_stop_report) + + def get_last_repeatability_test(self): + """Return the most recent repeatability test summary.""" + return copy.deepcopy(self.last_repeatability_test) + + def jog(self, steps, delay=None): + """Jog the stage forward or reverse by a signed number of logical steps.""" + steps = int(steps or 0) + if delay is None: + delay = self._step_delay() else: - log("Timed out while finding start position! Images will be misaligned.") - - # sets the motor pins as element in sequence - def setStepper(self, M_seq, i): - # write only to valid pins, protecting against None configs - if self._valid_pin(self.pins.get('coilpin_M11')): - gpio.output(self.pins['coilpin_M11'], M_seq[i][0]) - if self._valid_pin(self.pins.get('coilpin_M12')): - gpio.output(self.pins['coilpin_M12'], M_seq[i][1]) - if self._valid_pin(self.pins.get('coilpin_M21')): - gpio.output(self.pins['coilpin_M21'], M_seq[i][2]) - if self._valid_pin(self.pins.get('coilpin_M22')): - gpio.output(self.pins['coilpin_M22'], M_seq[i][3]) - - # steps the stepper motor using half steps, "delay" is time between coil change - # 400 steps is 360 degrees - def halfStep(self, steps, delay, keep_motor_on=False): - time.sleep(0.005) # time for motor to activate - for i in range(0, steps): - self.setStepper(self.halfstep_seq, self.seqNumb) - self.seqNumb += 1 - if (self.seqNumb == 8): - self.seqNumb = 0 + delay = max(0.01, float(delay)) + + if steps > 0: + self.halfStep(steps, delay) + elif steps < 0: + self.halfStepReverse(abs(steps), delay) + + return { + 'requested_steps': steps, + 'delay': delay, + 'sensor_active': self._sensor_active(), + 'step_counters': self.get_step_counters(), + } + + def _summarize_report_field(self, reports, field): + """Return min/avg/max summary stats for a numeric report field.""" + values = [report.get(field) for report in reports if isinstance(report.get(field), (int, float))] + if not values: + return None + return { + 'min': min(values), + 'avg': round(sum(values) / len(values), 2), + 'max': max(values), + } + + def run_repeatability_test(self, mode='next', cycles=5, calibration=None, overrides=None): + """Run repeated homing cycles and summarize how consistent they are.""" + mode = 'start' if mode == 'start' else 'next' + cycles = max(1, min(25, int(cycles or 1))) + calibration = int((self.cfg.get('calibration') if calibration is None else calibration) or 0) + + reports = [] + for cycle in range(cycles): + if mode == 'start': + success = self.findStart(calibration=calibration, overrides=overrides) + else: + success = self.findNextStop(calibration=calibration, overrides=overrides) + + report = self.get_last_stop_report() or {} + report['cycle'] = cycle + 1 + reports.append(report) + + if not success: + break + + successful_reports = [report for report in reports if report.get('success')] + summary = { + 'mode': mode, + 'cycles_requested': cycles, + 'cycles_completed': len(reports), + 'success_count': len(successful_reports), + 'failure_count': len(reports) - len(successful_reports), + 'travel_steps': self._summarize_report_field(successful_reports, 'net_steps_moved'), + 'coarse_steps_to_trigger': self._summarize_report_field(successful_reports, 'coarse_steps_to_trigger'), + 'secondary_search_steps_to_trigger': self._summarize_report_field(successful_reports, 'secondary_search_steps_to_trigger'), + 'reports': reports, + } + self.last_repeatability_test = copy.deepcopy(summary) + return copy.deepcopy(summary) + + def _set_direction(self, forward): + """Drive the M2 DIR pin to the requested logical direction.""" + if self.motor_dir_device is None: + return + if forward: + self.motor_dir_device.on() + else: + self.motor_dir_device.off() + + def _pulse_step(self, delay): + """Emit one microstep pulse on the DRV8825 STEP line.""" + if self.motor_step_device is None: time.sleep(delay) + return + + # Choose a conservative pulse-high time that is both bounded by the + # configured STEP high time and by a fraction of the total period so + # we never compress pulses when the requested period is very small. + # This helps avoid driving the DRV8825 with back-to-back pulses that + # the motor/driver can't cleanly handle. + period = float(delay) + max_pw = self._step_pulse_width() + pulse_width = min(max_pw, max(0.00005, period * 0.4)) + + # Use perf_counter for higher-resolution timing. We favour sleeping for + # the majority of longer waits and fall back to a short busy-wait for + # the final microseconds to improve accuracy. + start = time.perf_counter() + self.motor_step_device.on() + # Sleep for the bulk of the high time where it's beneficial + if pulse_width > 0.002: + time.sleep(pulse_width - 0.001) + while (time.perf_counter() - start) < pulse_width: + pass + else: + while (time.perf_counter() - start) < pulse_width: + pass + self.motor_step_device.off() + + # Wait until the full period has elapsed + end = start + period + remaining = end - time.perf_counter() + if remaining > 0.001: + time.sleep(max(0.0, remaining - 0.0005)) + while time.perf_counter() < end: + pass + + def halfStep(self, steps, delay, keep_motor_on=False): + """Move forward by a number of logical steps. + + The legacy method name is preserved because the rest of SPIRO already + uses it, but on the Waveshare HAT each call emits multiple STEP pulses on + M2 to match the old calibration scale while the HAT remains at its + factory-default microstepping setting. + """ + auto_enabled = False + if not self.motor: + self.motorOn(True) + auto_enabled = True + time.sleep(0.005) + + self._set_direction(True) + pulses_per_step = self._logical_step_pulses() + pulse_delay = max(self._step_pulse_width(), float(delay) / pulses_per_step) + for _ in range(0, max(0, int(steps))): + for _ in range(pulses_per_step): + self._pulse_step(pulse_delay) + self.seqNumb = (self.seqNumb + 1) % 8 + self.forward_step_count += 1 + + if auto_enabled and not keep_motor_on: + self.motorOn(False) + + def halfStepReverse(self, steps, delay): + """Move in reverse by a number of logical steps.""" + auto_enabled = False + if not self.motor: + self.motorOn(True) + auto_enabled = True + time.sleep(0.005) + + self._set_direction(False) + pulses_per_step = self._logical_step_pulses() + pulse_delay = max(self._step_pulse_width(), float(delay) / pulses_per_step) + for _ in range(0, max(0, int(steps))): + for _ in range(pulses_per_step): + self._pulse_step(pulse_delay) + self.seqNumb = (self.seqNumb - 1) % 8 + self.reverse_step_count += 1 + + if auto_enabled: + self.motorOn(False) + + def get_sensor_status(self): + """Return the live switch state along with simple trip diagnostics.""" + active = self._sensor_active() + if active and not self.sensor_active: + self.sensor_last_trip_time = time.time() + self.sensor_trip_count += 1 + self.sensor_active = active + return { + 'active': active, + 'trip_count': self.sensor_trip_count, + 'last_trip_time': self.sensor_last_trip_time, + } - # sets motor standby status def motorOn(self, value): - if self._valid_pin(self.pins.get('stdby')): - gpio.output(self.pins['stdby'], value) + """Enable or disable the Waveshare M2 driver output stage.""" + if self.motor_enable_device is not None: + if value: + self.motor_enable_device.on() + else: + self.motor_enable_device.off() + if not value and self.motor_step_device is not None: + self.motor_step_device.off() + self.motor = bool(value) - # turns on and off led def LEDControl(self, value): - if self._valid_pin(self.pins.get('LED')): - gpio.output(self.pins['LED'], value) - self.led = value + """Turn the illumination LED on or off.""" + if self.led_device is not None: + if value: + self.led_device.on() + else: + self.led_device.off() + self.led = bool(value) - # focuses the ArduCam motorized focus camera - # code is from ArduCam GitHub repo def focusCam(self, val): + """Map the UI focus value into the active camera backend's focus control.""" if getattr(cam, 'type', None) == 'legacy': - # focuses the ArduCam motorized focus camera - # code is from ArduCam GitHub repo value = (val << 4) & 0x3ff0 data1 = (value >> 8) & 0x3f data2 = value & 0xf0 @@ -136,26 +667,13 @@ def focusCam(self, val): if os.path.exists('/dev/i2c-1'): os.system("i2cset -y 1 0x0c %d %d" % (data1, data2)) elif getattr(cam, 'type', None) == 'libcamera': - # adapt to libcamera's allowed focusing range - # guard against missing lens_limits if not getattr(cam, 'lens_limits', None): return try: - (lens_far, lens_close) = cam.lens_limits[:2] - # protect against zero-division and invalid ranges + (_, lens_close) = cam.lens_limits[:2] if lens_close == 0: return val_mapped = (val - 10) / (990 / lens_close) cam.focus(val_mapped) except Exception: - debug('Failed mapping focus value', exc_info=True) - - # my copy of the pinout - pins = {} - - # state of stepper motor sequence - seqNumb = 0 - - # sequence for one coil rotation of stepper motor using half step - halfstep_seq = [(1,0,0,0), (1,0,1,0), (0,0,1,0), (0,1,1,0), - (0,1,0,0), (0,1,0,1), (0,0,0,1), (1,0,0,1)] \ No newline at end of file + debug('Failed mapping focus value', exc_info=True) \ No newline at end of file diff --git a/spiro/logger.py b/spiro/logger.py index 5b1691a..7aa358c 100644 --- a/spiro/logger.py +++ b/spiro/logger.py @@ -1,6 +1,9 @@ -# logger.py - -# simple functions for logging -# +"""Minimal logging helpers used throughout the SPIRO codebase. + +The project logs to stderr because the primary deployment target is a systemd +user service, where stderr is captured by journald and can be viewed from the +web UI or with journalctl. +""" import sys from spiro.config import Config @@ -8,10 +11,12 @@ cfg = Config() def log(msg): + """Write an unconditional log line to stderr.""" sys.stderr.write(msg + '\n') sys.stderr.flush() def debug(msg): + """Write a debug log line when the persistent debug flag is enabled.""" if cfg.get('debug'): sys.stderr.write(msg + '\n') sys.stderr.flush() diff --git a/spiro/spiro.py b/spiro/spiro.py index a32074a..9fad982 100644 --- a/spiro/spiro.py +++ b/spiro/spiro.py @@ -1,13 +1,13 @@ #!/usr/bin/env python3 -# -# spiro.py - -# primary entry point for the spiro control software -# +"""Primary command-line entry point for SPIRO. + +Running without flags starts the web UI and hardware controllers. Command-line +flags perform one-shot maintenance tasks such as resetting configuration, +toggling debug logging, or managing the optional hotspot service. +""" import os import textwrap -import RPi.GPIO as gpio -# from picamera import PiCamera from spiro.config import Config from spiro.hwcontrol import HWControl from spiro.logger import log, debug @@ -57,6 +57,7 @@ def installService(): + """Install a user-level systemd service that launches SPIRO on login.""" try: os.makedirs(os.path.expanduser('~/.config/systemd/user'), exist_ok=True) except OSError as e: @@ -82,6 +83,7 @@ def installService(): def terminate(sig, frame): + """Handle shutdown signals and tear down the camera, web UI, and GPIO safely.""" global shutdown if sig == signal.SIGALRM: # force shutdown @@ -129,6 +131,7 @@ def terminate(sig, frame): # start here. def main(): + """Execute requested CLI actions or launch the main web UI.""" global cam if options.reset: print("Clearing all configuration values.") @@ -164,11 +167,11 @@ def main(): options.show_version]): sys.exit() - # no options given, go ahead and start web ui + # No one-shot CLI options were requested, so continue into the long-running + # hardware-backed web application mode. try: os.chdir(os.path.expanduser('~')) from spiro.camera import cam - gpio.setmode(gpio.BCM) hw.GPIOInit() # cam = initCam() log('Starting web UI.') @@ -176,4 +179,6 @@ def main(): except Exception as e: global failed failed = True + # If startup fails after process launch, fall back to the diagnostic UI + # so users can still inspect logs and the captured traceback remotely. failsafe.start(e) \ No newline at end of file diff --git a/spiro/static/rotate.png b/spiro/static/rotate.png new file mode 100644 index 0000000..39d074e Binary files /dev/null and b/spiro/static/rotate.png differ diff --git a/spiro/static/rotate.svg b/spiro/static/rotate.svg new file mode 100644 index 0000000..2a3b2a9 --- /dev/null +++ b/spiro/static/rotate.svg @@ -0,0 +1,73 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/spiro/static/spiro.css b/spiro/static/spiro.css index 989f1fe..a673d88 100644 --- a/spiro/static/spiro.css +++ b/spiro/static/spiro.css @@ -93,15 +93,218 @@ div.tool { font-weight: bold; color: grey; } +.sensor-tool { + gap: 4px; +} +.sensor-label { + font-size: 10pt; + color: #cfcfcf; +} +.sensor-indicator { + width: 16px; + height: 16px; + border-radius: 50%; + display: inline-block; + background-color: #6a6a6a; + border: 1px solid #2f2f2f; + box-shadow: 0 0 4px rgba(0, 0, 0, 0.6) inset; +} +.sensor-indicator.active { + background-color: #3fd768; + box-shadow: 0 0 10px rgba(63, 215, 104, 0.95); +} +.sensor-indicator.trip-flash { + background-color: #ffe26f; + box-shadow: 0 0 14px rgba(255, 226, 111, 1); +} .pure-button { font-family: 'Saira Condensed', sans-serif; font-weight: bold; font-size: 14pt; } +.motoroff-btn { + width: 60px; + text-align: center; + font-size: 10pt; + line-height: 1.1; + padding: 0.35em 0.25em; +} +.calibration-status-row { + display: flex; + align-items: center; + gap: 12px; + margin-top: 12px; + flex-wrap: wrap; +} +.calibration-settings-form { + max-width: 1040px; +} +.calibration-settings-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(220px, 1fr)); + gap: 14px 18px; +} +.calibration-field { + display: flex; + flex-direction: column; + gap: 6px; +} +.calibration-field input, +.calibration-field select { + max-width: 100%; +} +.calibration-action-row { + display: flex; + flex-wrap: wrap; + gap: 10px; + margin-top: 16px; +} +.calibration-note { + margin-top: 10px; + color: #cfcfcf; + font-size: 10.5pt; +} +.calibration-action-status { + margin-top: 14px; + padding: 10px 12px; + border-radius: 12px; + background-color: rgba(255, 255, 255, 0.08); + color: #e8e8e8; + font-family: 'Saira Condensed', sans-serif; + font-size: 13pt; +} +.calibration-action-status.error { + background-color: rgba(220, 20, 17, 0.2); + color: #ffd5d4; +} +.calibration-motor-status { + font-family: 'Saira Condensed', sans-serif; + font-size: 12pt; + color: #d6d6d6; +} +.calibration-workspace { + display: flex; + gap: 18px; + align-items: flex-start; + margin-top: 18px; + flex-wrap: wrap; +} +.calibration-live-panel { + flex: 1 1 520px; + min-width: 320px; +} +.calibration-dashboard { + flex: 1 1 360px; + display: grid; + grid-template-columns: repeat(auto-fit, minmax(260px, 1fr)); + gap: 14px; + min-width: 320px; +} +.calibration-card { + padding: 14px; + border-radius: 18px; + background-color: rgba(0, 0, 0, 0.35); + box-shadow: 0 0 2px rgba(0, 0, 0, 0.4); +} +.calibration-card-wide { + grid-column: 1 / -1; +} +.calibration-card-title { + font-family: 'Saira Condensed', sans-serif; + font-size: 18pt; + color: white; + margin-bottom: 12px; +} +.calibration-stat-list { + display: grid; + gap: 8px; +} +.calibration-stat-row { + display: flex; + align-items: baseline; + justify-content: space-between; + gap: 12px; + font-family: 'Saira Condensed', sans-serif; + color: #d8d8d8; +} +.calibration-stat-row strong { + color: white; + text-align: right; +} +.calibration-card-actions { + display: flex; + flex-wrap: wrap; + gap: 10px; + margin-top: 14px; +} +.calibration-control-grid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(130px, 1fr)); + gap: 12px; +} +.calibration-inline-status { + font-family: 'Saira Condensed', sans-serif; + color: #f0f0f0; +} +.calibration-sensor-tool { + margin-top: 0; + margin-bottom: 10px; + align-items: center; + flex-direction: row; + gap: 8px; +} +.calibration-summary-text { + margin-top: 12px; + color: #d6d6d6; + font-size: 11pt; + line-height: 1.35; +} +.calibration-report-block { + margin-top: 18px; + padding: 14px; + border-radius: 18px; + background-color: rgba(0, 0, 0, 0.35); +} +.calibration-table-wrap { + overflow-x: auto; +} +.calibration-report-table { + width: 100%; + background-color: rgba(0, 0, 0, 0.2); +} +.calibration-report-table th, +.calibration-report-table td { + white-space: nowrap; +} +.calibration-report-row-failed td { + color: #ffb6b4; +} .pure-form label { font-family: 'Saira Condensed', sans-serif; font-size: 14pt; } +.settings-form { + max-width: 760px; +} +.settings-form .pure-control-group input { + width: 100%; + max-width: 360px; +} +.settings-form .pure-control-group label { + width: 180px; +} +.settings-actions { + display: flex; + gap: 20px; + align-items: center; + flex-wrap: wrap; +} +.settings-help { + max-width: 560px; +} +.settings-help .pure-form-message-inline { + padding-left: 0; +} div.expinfo { color: lightgrey; } @@ -136,6 +339,39 @@ img.liveview { border-radius: 20px; box-shadow: 0px 0px 2px black; } +.still-preview-panel { + margin-top: 18px; + margin-right: 10px; + padding: 14px; + border-radius: 20px; + background-color: rgba(0, 0, 0, 0.35); +} +.still-preview-header { + display: flex; + align-items: center; + justify-content: space-between; + gap: 12px; + flex-wrap: wrap; +} +.still-preview-title { + font-family: 'Saira Condensed', sans-serif; + font-size: 20pt; + font-weight: bold; + color: white; +} +.still-preview-status { + margin-top: 8px; + margin-bottom: 12px; + color: #d2d2d2; + font-size: 11pt; +} +.still-preview-image { + width: 100%; + max-height: 55vh; + object-fit: contain; + border-radius: 12px; + background-color: rgba(255, 255, 255, 0.05); +} div.toolbar { width: 70px; border-radius: 20px; @@ -146,6 +382,7 @@ div.vcenter { margin-bottom: auto; } img.calibrate { + width: 100%; max-height: 80vh; } div.logo { @@ -217,7 +454,7 @@ legend.label { color: white; } a.passchange { - margin-top: 20px; + margin-top: 0; } ul.flashes { margin-left: 0px; @@ -242,26 +479,59 @@ div.pan { position: relative; } input.zoom { - transform:rotate(270deg); - width: 70px; - top: 55px; - left: 0px; - position: absolute; + width: 100%; + position: static; + transform: none; background-color: transparent; + margin-top: 10px; } div.zoom { - height: 90px; + width: 120px; position: relative; } div.focus { + width: 120px; position: relative; } +div.live-save-tool { + min-width: 110px; +} +div.live-control-group { + display: flex; + align-items: center; + gap: 6px; +} +input#zoomnumber { + width: 72px; + margin-top: 0; +} +.focus #focusnumber { + width: 72px; + margin-top: 0; +} +.live-adjust-btn { + min-width: 28px; + padding: 0.25em 0.45em; + font-size: 12pt; + line-height: 1; +} +.live-save-status { + margin-top: 8px; + font-size: 10pt; + color: #cfcfcf; + max-width: 110px; +} +.live-save-status.saved { + color: hsl(99, 75%, 64%); +} input.focusslider { position: relative; - left: -1px; - transform: rotate(90deg); - height: 90px; + left: 0; + transform: none; + width: 100%; + height: auto; background-color: transparent; + margin-top: 10px; } div.pan-left { position: absolute; @@ -365,6 +635,16 @@ input { iframe { color: white; } +@media (max-width: 960px) { + .calibration-workspace { + flex-direction: column; + } + .calibration-live-panel, + .calibration-dashboard { + min-width: 0; + width: 100%; + } +} input[type=range] { -webkit-appearance: none; width: 100%; diff --git a/spiro/static/spiro.js b/spiro/static/spiro.js index d58fbdf..1aa3170 100644 --- a/spiro/static/spiro.js +++ b/spiro/static/spiro.js @@ -14,15 +14,150 @@ function toggleLED(value) { } function updateFocusNumber(value) { - focusnumber = document.getElementById('focusnumber'); + var focusnumber = document.getElementById('focusnumber'); + var focusslider = document.getElementById('focusslider'); focusnumber.value = value; + if (focusslider) { + focusslider.value = value; + } bgGet('/focus/' + value); + markLiveSettingsDirty(); } function updateFocusSlider(value) { - focusslider = document.getElementById('focusslider'); + var focusslider = document.getElementById('focusslider'); + var focusnumber = document.getElementById('focusnumber'); focusslider.value = value; + if (focusnumber) { + focusnumber.value = value; + } bgGet('/focus/' + value); + markLiveSettingsDirty(); +} + +function updateZoomNumber(value) { + var zoomnumber = document.getElementById('zoomnumber'); + var zoomslider = document.getElementById('zoom'); + zoomnumber.value = value; + if (zoomslider) { + zoomslider.value = value; + } + bgGet('/zoom/' + value); + markLiveSettingsDirty(); +} + +function updateZoomSlider(value) { + var zoomslider = document.getElementById('zoom'); + var zoomnumber = document.getElementById('zoomnumber'); + zoomslider.value = value; + if (zoomnumber) { + zoomnumber.value = value; + } + bgGet('/zoom/' + value); + markLiveSettingsDirty(); +} + +function markLiveSettingsDirty() { + var status = document.getElementById('live-save-status'); + if (!status) { + return; + } + status.textContent = 'Unsaved view changes'; + status.classList.remove('saved'); +} + +function nudgeFocus(delta) { + var focusnumber = document.getElementById('focusnumber'); + var nextValue = Math.min(1000, Math.max(10, parseInt(focusnumber.value || '250', 10) + delta)); + updateFocusSlider(nextValue); + return false; +} + +function nudgeZoom(delta) { + var zoomnumber = document.getElementById('zoomnumber'); + var current = parseFloat(zoomnumber.value || '100'); + var nextValue = Math.min(100, Math.max(20, current + delta)); + updateZoomSlider(nextValue.toFixed(1)); + return false; +} + +function saveLiveViewSettings() { + var focusnumber = document.getElementById('focusnumber'); + var zoomnumber = document.getElementById('zoomnumber'); + var status = document.getElementById('live-save-status'); + var params = new URLSearchParams({ + focus: focusnumber.value, + zoom: zoomnumber.value + }); + + fetch('/live/settings/save?' + params.toString(), { cache: 'no-store' }) + .then(function (r) { return r.json(); }) + .then(function (data) { + if (!status) { + return; + } + status.textContent = 'Saved focus ' + data.focus + ' / zoom ' + data.zoom + '%'; + status.classList.add('saved'); + window.setTimeout(function () { + status.classList.remove('saved'); + }, 1500); + }) + .catch(function () { + if (status) { + status.textContent = 'Save failed'; + status.classList.remove('saved'); + } + }); + + return false; +} + +function captureStillPreview() { + var status = document.getElementById('preview-still-status'); + var image = document.getElementById('live-preview-still'); + var focusnumber = document.getElementById('focusnumber'); + var focusslider = document.getElementById('focusslider'); + var zoomnumber = document.getElementById('zoomnumber'); + var zoomslider = document.getElementById('zoom'); + + if (status) { + status.textContent = 'Capturing still preview...'; + } + + fetch('/live-preview-still/capture', { cache: 'no-store' }) + .then(function (r) { return r.json(); }) + .then(function (data) { + if (image) { + image.src = '/live-preview-still.png?t=' + data.timestamp; + } + if (focusnumber && data.focus !== null && data.focus !== undefined) { + focusnumber.value = data.focus; + } + if (focusslider && data.focus !== null && data.focus !== undefined) { + focusslider.value = data.focus; + } + if (zoomnumber && data.zoom !== null && data.zoom !== undefined) { + zoomnumber.value = data.zoom; + } + if (zoomslider && data.zoom !== null && data.zoom !== undefined) { + zoomslider.value = data.zoom; + } + if (status) { + status.textContent = 'Captured ' + data.mode + ' still preview' + (data.rotated ? ' with 90 degree clockwise rotation.' : '.'); + } + var liveStatus = document.getElementById('live-save-status'); + if (liveStatus && data.focus !== null && data.focus !== undefined && data.zoom !== null && data.zoom !== undefined) { + liveStatus.textContent = 'Saved focus ' + data.focus + ' / zoom ' + data.zoom + '%'; + liveStatus.classList.add('saved'); + } + }) + .catch(function () { + if (status) { + status.textContent = 'Still preview capture failed.'; + } + }); + + return false; } function updateShutter(value) { @@ -35,11 +170,12 @@ function updateShutter(value) { function calcDiskSpace() { disk = document.getElementById('disk'); + positionCount = parseFloat(disk.dataset.positionCount || 8); delaydom = document.getElementById('delay'); delay = parseFloat(delaydom.value); duration = document.getElementById('duration'); duration = parseFloat(duration.value); - req = 4 * 8 * duration * 24 * 60 / delay / 1024; + req = positionCount * 8 * duration * 24 * 60 / delay / 1024; disk.innerHTML = req.toFixed(1) + " GB"; avail = document.getElementById('diskavail').innerHTML; avail = parseFloat(avail.split(" ")[0]); @@ -58,7 +194,387 @@ function calcDiskSpace() { } } +function setCalibrationActionStatus(message, isError) { + var status = document.getElementById('calibration-action-status'); + if (!status) { + return; + } + + status.textContent = message || 'Ready for calibration testing.'; + status.classList.toggle('error', !!isError); +} + +function collectCalibrationSettings() { + var params = new URLSearchParams(); + var fieldIds = [ + ['calibration', 'calibration'], + ['motor-step-delay', 'motor_step_delay'], + ['motor-secondary-step-delay', 'motor_secondary_step_delay'], + ['secondary-stop-steps', 'secondary_stop_steps'], + ['stop-stable-reads', 'stop_stable_reads'], + ['secondary-stop-settle-time', 'secondary_stop_settle_time'] + ]; + + fieldIds.forEach(function (entry) { + var element = document.getElementById(entry[0]); + if (element) { + params.set(entry[1], element.value); + } + }); + + return params; +} + +function postCalibrationRequest(url, params) { + return fetch(url, { + method: 'POST', + cache: 'no-store', + headers: { + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' + }, + body: params.toString() + }).then(function (response) { + return response.json().then(function (data) { + if (!response.ok) { + throw data; + } + return data; + }); + }); +} + +function updateSensorIndicatorDisplay(sensor) { + var indicator = document.getElementById('sensor-indicator'); + var stateText = document.getElementById('sensor-state-text'); + var tripCount = document.getElementById('sensor-trip-count'); + var lastTrip = document.getElementById('sensor-last-trip'); + + if (indicator) { + indicator.classList.toggle('active', !!sensor.active); + } + if (stateText) { + stateText.textContent = sensor.active ? 'Active' : 'Inactive'; + } + if (tripCount) { + tripCount.textContent = sensor.trip_count; + } + if (lastTrip) { + lastTrip.textContent = sensor.last_trip_text || 'Never'; + } +} + +function updateCalibrationStepCounters(steps) { + var mappings = { + 'step-forward-count': steps.forward_steps, + 'step-reverse-count': steps.reverse_steps, + 'step-net-count': steps.net_steps, + 'step-sequence-index': steps.sequence_index + }; + + Object.keys(mappings).forEach(function (id) { + var element = document.getElementById(id); + if (element) { + element.textContent = mappings[id]; + } + }); +} + +function formatAdjustmentText(report) { + if (!report || !report.final_adjustment_steps) { + return '—'; + } + return report.final_adjustment_direction + ' ' + report.final_adjustment_steps; +} + +function renderLastStopReport(report) { + var values = { + 'last-stop-label': report ? report.label : 'No stop run yet', + 'last-stop-result': report ? (report.success ? 'Success' : 'Failed') : '—', + 'last-stop-completed': report && report.completed_at_text ? report.completed_at_text : '—', + 'last-stop-coarse-steps': report ? report.coarse_steps_to_trigger : '—', + 'last-stop-secondary-clear': report ? report.secondary_clear_steps : '—', + 'last-stop-secondary-approach': report ? report.secondary_approach_steps : '—', + 'last-stop-secondary-search': report ? report.secondary_search_steps_to_trigger : '—', + 'last-stop-adjustment': formatAdjustmentText(report), + 'last-stop-net-steps': report ? report.net_steps_moved : '—', + 'last-stop-reason': report && report.failure_reason ? report.failure_reason : '—', + 'last-stop-duration': report && report.duration_seconds !== undefined ? report.duration_seconds + ' s' : '—' + }; + + Object.keys(values).forEach(function (id) { + var element = document.getElementById(id); + if (element) { + element.textContent = values[id]; + } + }); +} + +function formatSummaryRange(summary) { + if (!summary) { + return '—'; + } + return summary.min + ' / ' + summary.avg + ' / ' + summary.max; +} + +function renderRepeatabilityResults(summary) { + var summaryElement = document.getElementById('repeatability-summary'); + var body = document.getElementById('repeatability-results-body'); + + if (!summary || !summary.reports || !summary.reports.length) { + if (summaryElement) { + summaryElement.textContent = 'No repeatability test run yet.'; + } + if (body) { + body.innerHTML = ''; + var emptyRow = document.createElement('tr'); + var emptyCell = document.createElement('td'); + emptyCell.colSpan = 8; + emptyCell.textContent = 'No repeatability test run yet.'; + emptyRow.appendChild(emptyCell); + body.appendChild(emptyRow); + } + return; + } + + if (summaryElement) { + var modeLabel = summary.mode === 'start' ? 'Start position' : 'Next position'; + summaryElement.textContent = 'Mode: ' + modeLabel + '. Success ' + summary.success_count + '/' + summary.cycles_requested + '. Net steps min/avg/max: ' + formatSummaryRange(summary.travel_steps) + '. Coarse trigger min/avg/max: ' + formatSummaryRange(summary.coarse_steps_to_trigger) + '.'; + } + + if (!body) { + return; + } + + body.innerHTML = ''; + summary.reports.forEach(function (report) { + var row = document.createElement('tr'); + if (!report.success) { + row.classList.add('calibration-report-row-failed'); + } + + [ + report.cycle, + report.success ? 'Success' : 'Failed', + report.net_steps_moved, + report.coarse_steps_to_trigger, + report.secondary_search_steps_to_trigger, + formatAdjustmentText(report), + report.duration_seconds, + report.failure_reason || '—' + ].forEach(function (value) { + var cell = document.createElement('td'); + cell.textContent = value === undefined || value === null || value === '' ? '—' : value; + row.appendChild(cell); + }); + + body.appendChild(row); + }); +} + function tryCalibration() { - calib = document.getElementById('calibration'); - bgGet('/findstart/' + calib.value); + return tryCalibrationStart(); +} + +function toggleMotor() { + var btn = document.getElementById('motorbtn'); + if (btn.dataset.state === 'on') { + bgGet('/motor/off'); + btn.dataset.state = 'off'; + btn.textContent = 'Motor Off'; + btn.classList.remove('red'); + btn.title = 'Enable stepper motors'; + } else { + bgGet('/motor/on'); + btn.dataset.state = 'on'; + btn.textContent = 'Motor On'; + btn.classList.add('red'); + btn.title = 'Disable stepper motors'; + } +} + +function applyCalibrationMotorStatus(data) { + var btn = document.getElementById('calibration-motorbtn'); + var status = document.getElementById('calibration-motor-status'); + var state = document.getElementById('calibration-motor-state'); + var holdTimer = document.getElementById('calibration-hold-timer'); + if (!btn || !status || !data) { + return; + } + + var isOn = !!data.active; + btn.dataset.state = isOn ? 'on' : 'off'; + btn.textContent = isOn ? 'Motor On' : 'Motor Off'; + btn.classList.toggle('red', isOn); + if (state) { + state.textContent = isOn ? 'Holding' : 'Released'; + } + + if (data.calibration_hold_active) { + status.textContent = 'Holding position for ' + data.calibration_hold_remaining + 's'; + if (holdTimer) { + holdTimer.textContent = data.calibration_hold_remaining + 's'; + } + } else if (isOn) { + status.textContent = 'Motor holding position'; + if (holdTimer) { + holdTimer.textContent = 'Manual hold'; + } + } else { + status.textContent = 'Motor released'; + if (holdTimer) { + holdTimer.textContent = 'Off'; + } + } +} + +function renderCalibrationSnapshot(data) { + if (!data) { + return; + } + + if (data.motor) { + applyCalibrationMotorStatus(data.motor); + } + if (data.sensor) { + updateSensorIndicatorDisplay(data.sensor); + } + if (data.steps) { + updateCalibrationStepCounters(data.steps); + } + renderLastStopReport(data.last_stop_report || null); + renderRepeatabilityResults(data.last_repeatability_test || null); + + if (data.message) { + setCalibrationActionStatus(data.message, data.success === false); + } +} + +function runCalibrationAction(mode) { + var params = collectCalibrationSettings(); + setCalibrationActionStatus('Running ' + mode + ' stop search...', false); + postCalibrationRequest('/calibrate/run/' + mode, params) + .then(renderCalibrationSnapshot) + .catch(function (error) { + setCalibrationActionStatus(error && error.error ? error.error : 'Calibration action failed.', true); + }); + return false; +} + +function tryCalibrationStart() { + return runCalibrationAction('start'); } + +function tryCalibrationNext() { + return runCalibrationAction('next'); +} + +function jogCalibration(direction) { + var params = new URLSearchParams(); + var steps = document.getElementById('calibration-jog-steps'); + var delay = document.getElementById('calibration-jog-delay'); + params.set('steps', steps ? steps.value : '25'); + params.set('delay', delay ? delay.value : '0.08'); + + setCalibrationActionStatus('Jogging ' + direction + '...', false); + postCalibrationRequest('/calibrate/jog/' + direction, params) + .then(renderCalibrationSnapshot) + .catch(function (error) { + setCalibrationActionStatus(error && error.error ? error.error : 'Jog request failed.', true); + }); + return false; +} + +function runCalibrationRepeatabilityTest() { + var params = collectCalibrationSettings(); + var mode = document.getElementById('repeatability-mode'); + var cycles = document.getElementById('repeatability-cycles'); + params.set('mode', mode ? mode.value : 'next'); + params.set('cycles', cycles ? cycles.value : '5'); + + setCalibrationActionStatus('Running repeatability test...', false); + postCalibrationRequest('/calibrate/test/repeat', params) + .then(renderCalibrationSnapshot) + .catch(function (error) { + setCalibrationActionStatus(error && error.error ? error.error : 'Repeatability test failed.', true); + }); + return false; +} + +function resetCalibrationCounters() { + setCalibrationActionStatus('Resetting calibration counters...', false); + postCalibrationRequest('/calibrate/counters/reset', new URLSearchParams()) + .then(renderCalibrationSnapshot) + .catch(function (error) { + setCalibrationActionStatus(error && error.error ? error.error : 'Unable to reset counters.', true); + }); + return false; +} + +function toggleCalibrationMotor() { + var btn = document.getElementById('calibration-motorbtn'); + if (!btn) { + return false; + } + + var url = btn.dataset.state === 'on' ? '/calibrate/motor/off' : '/calibrate/motor/on'; + fetch(url, { cache: 'no-store' }) + .then(function (r) { return r.json(); }) + .then(applyCalibrationMotorStatus) + .catch(function () { /* leave last known state */ }); + return false; +} + +function initCalibrationMotorStatus() { + var btn = document.getElementById('calibration-motorbtn'); + if (!btn) { + return; + } + + function pollCalibrationStatus() { + fetch('/calibrate/status', { cache: 'no-store' }) + .then(function (r) { return r.json(); }) + .then(renderCalibrationSnapshot) + .catch(function () { /* keep polling */ }); + } + + pollCalibrationStatus(); + window.setInterval(pollCalibrationStatus, 1000); +} + +function initSensorIndicator() { + var indicator = document.getElementById('sensor-indicator'); + if (!indicator) { + return; + } + + var lastTripCount = null; + + function applyStatus(data) { + if (data.active) { + indicator.classList.add('active'); + } else { + indicator.classList.remove('active'); + } + + if (lastTripCount !== null && data.trip_count > lastTripCount) { + indicator.classList.add('trip-flash'); + window.setTimeout(function () { + indicator.classList.remove('trip-flash'); + }, 250); + } + lastTripCount = data.trip_count; + } + + function pollSensor() { + fetch('/sensor-status', { cache: 'no-store' }) + .then(function (r) { return r.json(); }) + .then(applyStatus) + .catch(function () { /* keep polling */ }); + } + + pollSensor(); + window.setInterval(pollSensor, 100); +} + +document.addEventListener('DOMContentLoaded', initSensorIndicator); +document.addEventListener('DOMContentLoaded', initCalibrationMotorStatus); diff --git a/spiro/templates/calibrate.html b/spiro/templates/calibrate.html index 505af84..3d4957b 100644 --- a/spiro/templates/calibrate.html +++ b/spiro/templates/calibrate.html @@ -1,5 +1,5 @@ {% extends "layout.html" %} -{% block title %}Main view{% endblock %} +{% block title %}Motor calibration{% endblock %} {% block content %}
diff --git a/spiro/templates/settings.html b/spiro/templates/settings.html index 8857342..880cfe4 100644 --- a/spiro/templates/settings.html +++ b/spiro/templates/settings.html @@ -26,19 +26,64 @@
-
+
Settings -
+
- -
-
+ +
+
+ + + +{% for value, label in timezone_choices %} + +{% endfor %} + +
+ +
+Used for saved filenames and displayed dates. Device stays on UTC. Default is EST via America/New_York. +
+Time +
+
+Preferred timezone: +
+
+{{ timezone_name }} +
+
+
+
+Current local time: +
+
+{{ current_time }} +
+
+
+
+Device UTC time: +
+
+{{ current_utc_time }} +
+
+
+
+Time sync: +
+ +
System info
@@ -108,4 +153,48 @@
+ {% endblock %} diff --git a/spiro/timeutils.py b/spiro/timeutils.py new file mode 100644 index 0000000..ab2f55f --- /dev/null +++ b/spiro/timeutils.py @@ -0,0 +1,83 @@ +"""Timezone-aware helpers for UI display, filenames, and folder naming. + +SPIRO keeps the device clock in UTC, then converts timestamps into the user's +preferred timezone only for presentation and naming. +""" + +from datetime import datetime, timezone +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError + + +DEFAULT_TIMEZONE = 'America/New_York' +TIMEZONE_ALIASES = { + 'EST': 'America/New_York', + 'EDT': 'America/New_York', + 'ET': 'America/New_York', + 'CST': 'America/Chicago', + 'CDT': 'America/Chicago', + 'CT': 'America/Chicago', + 'MST': 'America/Denver', + 'MDT': 'America/Denver', + 'MT': 'America/Denver', + 'PST': 'America/Los_Angeles', + 'PDT': 'America/Los_Angeles', + 'PT': 'America/Los_Angeles', + 'UTC': 'UTC', + 'GMT': 'UTC', +} +COMMON_TIMEZONES = [ + ('America/New_York', 'Eastern (EST/EDT)'), + ('America/Chicago', 'Central (CST/CDT)'), + ('America/Denver', 'Mountain (MST/MDT)'), + ('America/Los_Angeles', 'Pacific (PST/PDT)'), + ('UTC', 'UTC'), +] + + +def normalize_timezone_name(name): + """Normalize friendly abbreviations and empty values to canonical names.""" + if not name: + return DEFAULT_TIMEZONE + normalized = str(name).strip() + if not normalized: + return DEFAULT_TIMEZONE + return TIMEZONE_ALIASES.get(normalized.upper(), normalized) + + +def validate_timezone_name(name): + """Validate a timezone name and return the canonical ZoneInfo key.""" + normalized = normalize_timezone_name(name) + try: + return ZoneInfo(normalized).key + except ZoneInfoNotFoundError as exc: + raise ValueError(f'Unknown timezone: {name}') from exc + + +def get_timezone(name=None): + """Return a ZoneInfo object for the provided or default timezone.""" + return ZoneInfo(validate_timezone_name(name or DEFAULT_TIMEZONE)) + + +def now_in_timezone(name=None): + """Return the current time converted from UTC into the requested timezone.""" + return datetime.now(timezone.utc).astimezone(get_timezone(name)) + + +def format_timestamp(timestamp, name=None, fmt='%Y-%m-%d %H:%M:%S %Z'): + """Format a POSIX timestamp for display in the selected timezone.""" + return datetime.fromtimestamp(timestamp, timezone.utc).astimezone(get_timezone(name)).strftime(fmt) + + +def format_now(name=None, fmt='%Y-%m-%d %H:%M:%S %Z'): + """Format the current time for display in the selected timezone.""" + return now_in_timezone(name).strftime(fmt) + + +def filename_timestamp(name=None): + """Return a filesystem-friendly timestamp for captured image names.""" + return now_in_timezone(name).strftime('%Y%m%d-%H%M%S') + + +def dated_folder_prefix(name=None): + """Return the date prefix used for default experiment folder names.""" + return now_in_timezone(name).strftime('%Y.%m.%d') \ No newline at end of file diff --git a/spiro/webui.py b/spiro/webui.py index 6745427..2cbba33 100644 --- a/spiro/webui.py +++ b/spiro/webui.py @@ -1,30 +1,293 @@ -# webui.py - -# main web ui controller for spiro -# +"""Main Flask web interface for SPIRO. + +This module coordinates camera live view, still-preview capture, experiment +control, calibration helpers, and file browsing. It also bridges between the +browser's normalized zoom/focus state and the hardware abstractions used by the +camera and motion-control layers. +""" import io import os import re +import subprocess import time import shutil import signal import hashlib -import subprocess -from threading import Thread, Lock, Condition +from threading import Thread, Lock, Condition, Timer from waitress import serve -from flask import Flask, render_template, Response, request, redirect, url_for, session, flash, abort +from flask import Flask, render_template, Response, request, redirect, url_for, session, flash, abort, jsonify import spiro.hostapd as hostapd from spiro.config import Config from spiro.logger import log, debug from spiro.experimenter import Experimenter +from spiro.timeutils import COMMON_TIMEZONES, format_timestamp, now_in_timezone, validate_timezone_name app = Flask(__name__) app.jinja_env.trim_blocks = True app.jinja_env.lstrip_blocks = True +CALIBRATION_MOTOR_HOLD_SECONDS = 5 * 60 +CALIBRATION_REPEATABILITY_MAX_CYCLES = 25 + + +def _cancel_calibration_motor_hold(): + """Cancel the temporary calibration hold timer and clear its deadline.""" + global calibration_motor_hold_timer, calibration_motor_hold_until + if calibration_motor_hold_timer is not None: + calibration_motor_hold_timer.cancel() + calibration_motor_hold_timer = None + calibration_motor_hold_until = 0.0 + + +def _motor_off_from_calibration_hold(): + """Timer callback that releases the motor after a calibration hold expires.""" + global calibration_motor_hold_timer, calibration_motor_hold_until + lock.acquire() + try: + hw.motorOn(False) + calibration_motor_hold_timer = None + calibration_motor_hold_until = 0.0 + finally: + lock.release() + + +def _enable_calibration_motor_hold(duration=CALIBRATION_MOTOR_HOLD_SECONDS): + """Keep the motor driver energized during calibration adjustments. + + Calibration often requires several small manual retries, so this helper + prevents the motor from being de-energized between requests for a limited + window. + """ + global calibration_motor_hold_timer, calibration_motor_hold_until + duration = max(1, int(duration or CALIBRATION_MOTOR_HOLD_SECONDS)) + lock.acquire() + try: + hw.motorOn(True) + _cancel_calibration_motor_hold() + calibration_motor_hold_until = time.time() + duration + calibration_motor_hold_timer = Timer(duration, _motor_off_from_calibration_hold) + calibration_motor_hold_timer.daemon = True + calibration_motor_hold_timer.start() + finally: + lock.release() + + +def _disable_calibration_motor_hold(): + """Immediately end calibration motor hold mode and release the motor.""" + lock.acquire() + try: + _cancel_calibration_motor_hold() + hw.motorOn(False) + finally: + lock.release() + + +def _get_motor_status(): + """Return current motor and calibration-hold status for the UI.""" + remaining = 0 + if calibration_motor_hold_until: + remaining = max(0, int(calibration_motor_hold_until - time.time())) + return { + 'active': bool(hw and hw.motor), + 'calibration_hold_active': remaining > 0, + 'calibration_hold_remaining': remaining, + } + + +def _format_status_timestamp(timestamp_value): + """Format a timestamp for the calibration UI using the configured timezone.""" + if not timestamp_value: + return 'Never' + return format_timestamp(timestamp_value, cfg.get('timezone')) + + +def _get_calibration_settings(): + """Return the current persisted stop-calibration settings.""" + return { + 'calibration': int(cfg.get('calibration') or 0), + 'motor_step_delay': float(cfg.get('motor_step_delay') or 0.08), + 'motor_secondary_step_delay': float(cfg.get('motor_secondary_step_delay') or 0.12), + 'secondary_stop_steps': int(cfg.get('secondary_stop_steps') or cfg.get('secondary_home_steps') or 0), + 'stop_stable_reads': int(cfg.get('stop_stable_reads') or 2), + 'secondary_stop_settle_time': float(cfg.get('secondary_stop_settle_time') or 0.03), + } + + +def _parse_int_setting(source, key, default, minimum, maximum): + """Parse a bounded integer setting from a request payload.""" + raw_value = source.get(key, default) + if raw_value in (None, ''): + raw_value = default + try: + value = int(raw_value) + except (TypeError, ValueError) as exc: + raise ValueError(f'Invalid value for {key}.') from exc + return max(minimum, min(maximum, value)) + + +def _parse_float_setting(source, key, default, minimum, maximum): + """Parse a bounded float setting from a request payload.""" + raw_value = source.get(key, default) + if raw_value in (None, ''): + raw_value = default + try: + value = float(raw_value) + except (TypeError, ValueError) as exc: + raise ValueError(f'Invalid value for {key}.') from exc + return max(minimum, min(maximum, value)) + + +def _parse_calibration_settings(source=None): + """Parse calibration tuning settings from a form or query payload.""" + source = source or request.values + persisted = _get_calibration_settings() + return { + 'calibration': _parse_int_setting(source, 'calibration', persisted['calibration'], -40, 399), + 'motor_step_delay': _parse_float_setting(source, 'motor_step_delay', persisted['motor_step_delay'], 0.01, 2.0), + 'motor_secondary_step_delay': _parse_float_setting( + source, + 'motor_secondary_step_delay', + persisted['motor_secondary_step_delay'], + 0.02, + 2.0, + ), + 'secondary_stop_steps': _parse_int_setting(source, 'secondary_stop_steps', persisted['secondary_stop_steps'], 0, 400), + 'stop_stable_reads': _parse_int_setting(source, 'stop_stable_reads', persisted['stop_stable_reads'], 1, 10), + 'secondary_stop_settle_time': _parse_float_setting( + source, + 'secondary_stop_settle_time', + persisted['secondary_stop_settle_time'], + 0.0, + 2.0, + ), + } + + +def _save_calibration_settings(settings): + """Persist the current calibration tuning settings.""" + cfg.set('calibration', settings['calibration']) + cfg.set('motor_step_delay', settings['motor_step_delay']) + cfg.set('motor_secondary_step_delay', settings['motor_secondary_step_delay']) + cfg.set('secondary_stop_steps', settings['secondary_stop_steps']) + cfg.set('secondary_home_steps', settings['secondary_stop_steps']) + cfg.set('stop_stable_reads', settings['stop_stable_reads']) + cfg.set('secondary_stop_settle_time', settings['secondary_stop_settle_time']) + + +def _serialize_calibration_report(report): + """Format a stop-finding diagnostics report for JSON/UI consumption.""" + if not report: + return None + report = dict(report) + for key in ('started_at', 'completed_at'): + if report.get(key): + report[key + '_text'] = _format_status_timestamp(report[key]) + for key in ('primary_step_delay', 'secondary_step_delay', 'secondary_stop_settle_time', 'duration_seconds'): + if isinstance(report.get(key), (int, float)): + report[key] = round(report[key], 4) + return report + + +def _serialize_repeatability_test(summary): + """Format a repeatability-test result for JSON/UI consumption.""" + if not summary: + return None + summary = dict(summary) + summary['reports'] = [_serialize_calibration_report(report) for report in summary.get('reports', [])] + return summary + + +def _get_calibration_snapshot(message=''): + """Return a single payload containing the live calibration dashboard state.""" + sensor = hw.get_sensor_status() + return { + 'message': message, + 'motor': _get_motor_status(), + 'sensor': { + **sensor, + 'last_trip_text': _format_status_timestamp(sensor.get('last_trip_time')), + }, + 'steps': hw.get_step_counters(), + 'settings': _get_calibration_settings(), + 'last_stop_report': _serialize_calibration_report(hw.get_last_stop_report()), + 'last_repeatability_test': _serialize_repeatability_test(hw.get_last_repeatability_test()), + } + + +def _json_error(message, status=400): + """Return a consistent JSON error response for calibration actions.""" + return jsonify({'success': False, 'error': message}), status + + +def _run_calibration_action(action, hold_motor=True): + """Execute a calibration action under the motion lock and return UI state.""" + if hold_motor: + _enable_calibration_motor_hold() + + lock.acquire() + try: + success, message = action() + time.sleep(0.15) + finally: + lock.release() + + payload = _get_calibration_snapshot(message=message) + payload['success'] = bool(success) + return jsonify(payload) + + +def _get_saved_zoom_state(): + """Read persisted zoom data and normalize legacy tuple formats.""" + saved_zoom = cfg.get('zoom') + if isinstance(saved_zoom, dict): + return { + 'x': saved_zoom.get('x', 0.5), + 'y': saved_zoom.get('y', 0.5), + 'roi': saved_zoom.get('roi', 1.0), + } + if isinstance(saved_zoom, (list, tuple)) and len(saved_zoom) == 4: + top, left, height, width = saved_zoom + del width + return { + 'x': left + height / 2.0, + 'y': top + height / 2.0, + 'roi': height, + } + return {'x': 0.5, 'y': 0.5, 'roi': 1.0} + + +def _get_current_live_view_state(): + """Snapshot the live-view zoom and focus so still capture can reuse them.""" + return { + 'zoom': { + 'x': zoomer.x, + 'y': zoomer.y, + 'roi': zoomer.roi, + }, + 'focus': current_focus_value, + } + + +def _apply_live_view_state(view_state): + """Restore zoom and focus after a still capture temporarily changed modes.""" + zoom_state = (view_state or {}).get('zoom') or {'x': 0.5, 'y': 0.5, 'roi': 1.0} + zoomer.set(x=zoom_state['x'], y=zoom_state['y'], roi=zoom_state['roi'], persist=False) + + focus_value = (view_state or {}).get('focus') + if focus_value is not None: + hw.focusCam(int(focus_value)) + + return { + 'focus': int(focus_value) if focus_value is not None else None, + 'zoom': round(zoom_state['roi'] * 100.0, 1), + } + class Rotator(Thread): + """Background worker that jogs the stage a requested number of logical steps.""" + def __init__(self, value): Thread.__init__(self) self.value = value @@ -34,7 +297,22 @@ def run(self): try: hw.motorOn(True) time.sleep(0.5) - hw.halfStep(self.value, 0.03) + hw.halfStep(self.value, hw._step_delay()) + time.sleep(0.5) + finally: + hw.motorOn(False) + lock.release() + + +class StopFinder(Thread): + """Background worker that advances the stage to the next sensed stop.""" + + def run(self): + lock.acquire() + try: + hw.motorOn(True) + time.sleep(0.5) + hw.findNextStop() time.sleep(0.5) finally: hw.motorOn(False) @@ -57,16 +335,17 @@ def run(self): # self.condition.notify_all() # self.buffer.seek(0) # return self.buffer.write(buf) -import io -from threading import Condition class StreamingOutput(io.BufferedIOBase): + """Buffer object that breaks an MJPEG byte stream into individual frames.""" + def __init__(self): self.frame = None self.buffer = io.BytesIO() self.condition = Condition() def write(self, buf): + """Store JPEG data and publish completed frames to waiting clients.""" if buf.startswith(b'\xff\xd8'): self.buffer.truncate() with self.condition: @@ -89,6 +368,8 @@ def write(self, buf): class ZoomObject(object): + """Track and apply normalized live-view pan and zoom state.""" + def __init__(self): self.roi = 1 self.x = 0.5 @@ -107,7 +388,8 @@ def set(self, x=None, y=None, roi=None, persist=True): def apply(self, persist=True): '''checks and applies zoom/pan values on camera object''' - # constrain roi and pan to valid ranges + # Clamp the requested region so the crop window always remains fully + # inside the sensor bounds before sending it to the camera backend. self.roi = max(min(self.roi, 1.0), 0.2) limits = (self.roi / 2.0, 1 - self.roi / 2.0) self.x = max(min(self.x, limits[1]), limits[0]) @@ -165,6 +447,14 @@ def checkPass(pwd): return False +def get_plate_dirs(check_dir): + plate_dirs = [] + for entry in os.scandir(check_dir): + if entry.is_dir() and re.fullmatch(r'plate\d+', entry.name): + plate_dirs.append((int(entry.name[5:]), entry.name)) + return [name for _, name in sorted(plate_dirs)] + + @app.route('/index.html') @app.route('/') def index(): @@ -173,7 +463,24 @@ def index(): if restarting: return render_template('restarting.html', refresh='60; url=/', message="Rebooting system...") # pass saved zoom settings to template so UI reflects persisted state - return render_template('index.html', live=livestream, focus=cfg.get('focus'), led=hw.led, name=cfg.get('name'), zoom=cfg.get('zoom')) + sensor_status = hw.get_sensor_status() + return render_template( + 'index.html', + live=livestream, + focus=current_focus_value, + led=hw.led, + motor=hw.motor, + name=cfg.get('name'), + zoom={'x': zoomer.x, 'y': zoomer.y, 'roi': zoomer.roi}, + sensor_active=sensor_status['active'], + saved_focus=cfg.get('focus'), + saved_zoom_percent=round(((cfg.get('zoom') or {}).get('roi', 1.0)) * 100.0, 1), + ) + + +@app.route('/motor-status') +def motor_status(): + return jsonify(_get_motor_status()) @app.route('/empty') @@ -234,10 +541,13 @@ def newpass(): @not_while_running -@app.route('/zoom/') +@app.route('/zoom/') def zoom(value): - roi_val = float(value / 100) - zoomer.set(roi=roi_val) + try: + roi_val = float(value) / 100.0 + except ValueError: + abort(404) + zoomer.set(roi=roi_val, persist=False) try: log(f"Zoom request: roi={roi_val:.4f} -> zoom now x={zoomer.x:.4f}, y={zoomer.y:.4f}, roi={zoomer.roi:.4f}") except Exception: @@ -250,9 +560,9 @@ def zoom(value): def pan(dir, value): delta = float(value) if dir == 'x': - zoomer.set(y = zoomer.y + delta) + zoomer.set(y=zoomer.y + delta, persist=False) elif dir == 'y': - zoomer.set(x = zoomer.x + delta) + zoomer.set(x=zoomer.x + delta, persist=False) try: log(f"Pan request: dir={dir}, delta={delta:.4f} -> zoom now x={zoomer.x:.4f}, y={zoomer.y:.4f}, roi={zoomer.roi:.4f}") except Exception: @@ -274,6 +584,7 @@ def switch_live(value): def setLive(val): + """Enable or disable the MJPEG live stream and camera preview mode.""" global livestream prev = livestream if val == 'on' and livestream != True: @@ -308,21 +619,184 @@ def rotate(value): return redirect(url_for('index')) +@not_while_running +@app.route('/nextposition') +def nextposition(): + stop_finder = StopFinder() + stop_finder.start() + return redirect(url_for('index')) + + +@not_while_running +@app.route('/motor/off') +def motor_off(): + _disable_calibration_motor_hold() + return redirect(url_for('index')) + + +@not_while_running +@app.route('/motor/on') +def motor_on(): + lock.acquire() + try: + _cancel_calibration_motor_hold() + hw.motorOn(True) + finally: + lock.release() + return redirect(url_for('index')) + + +@not_while_running +@app.route('/calibrate/motor/on') +def calibrate_motor_on(): + _enable_calibration_motor_hold() + return jsonify(_get_motor_status()) + + +@not_while_running +@app.route('/calibrate/motor/off') +def calibrate_motor_off(): + _disable_calibration_motor_hold() + return jsonify(_get_motor_status()) + + @not_while_running @app.route('/findstart') -@app.route('/findstart/') +@app.route('/findstart/') def findstart(value=None): - hw.motorOn(True) - if not value: - hw.findStart() - elif value > 0 and value < 400: - hw.findStart(calibration=value) - time.sleep(0.5) - hw.motorOn(False) + lock.acquire() + try: + _cancel_calibration_motor_hold() + hw.motorOn(True) + if value is None: + hw.findStart() + else: + try: + calibration = int(value) + except ValueError: + abort(404) + # Allow slight reverse adjustment while keeping users within a safe range. + calibration = max(-40, min(calibration, 399)) + hw.findStart(calibration=calibration) + time.sleep(0.5) + hw.motorOn(False) + finally: + lock.release() return redirect(url_for('index')) +@not_while_running +@app.route('/calibrate/try/') +def calibrate_try(value): + try: + calibration = int(value) + except ValueError: + abort(404) + calibration = max(-40, min(calibration, 399)) + + _enable_calibration_motor_hold() + lock.acquire() + try: + hw.findStart(calibration=calibration) + time.sleep(0.5) + finally: + lock.release() + payload = _get_calibration_snapshot(message='Start-position test completed.') + payload['success'] = bool((payload.get('last_stop_report') or {}).get('success')) + return jsonify(payload) + + +@not_while_running +@app.route('/calibrate/status') +def calibrate_status(): + return jsonify(_get_calibration_snapshot()) + + +@not_while_running +@app.route('/calibrate/run/', methods=['POST']) +def calibrate_run(mode): + if mode not in ('start', 'next'): + abort(404) + + try: + settings = _parse_calibration_settings(request.form) + except ValueError as exc: + return _json_error(str(exc)) + + def action(): + if mode == 'start': + success = hw.findStart(calibration=settings['calibration'], overrides=settings) + return success, 'Start-position search completed.' if success else 'Start-position search failed.' + success = hw.findNextStop(calibration=settings['calibration'], overrides=settings) + return success, 'Next-position search completed.' if success else 'Next-position search failed.' + + return _run_calibration_action(action) + + +@not_while_running +@app.route('/calibrate/jog/', methods=['POST']) +def calibrate_jog(direction): + if direction not in ('forward', 'reverse'): + abort(404) + + try: + steps = _parse_int_setting(request.form, 'steps', 25, 1, 2000) + delay = _parse_float_setting(request.form, 'delay', cfg.get('motor_step_delay') or 0.08, 0.01, 2.0) + except ValueError as exc: + return _json_error(str(exc)) + + signed_steps = steps if direction == 'forward' else -steps + + def action(): + hw.jog(signed_steps, delay=delay) + return True, f'Jogged {direction} {steps} steps at {delay:.3f}s per logical step.' + + return _run_calibration_action(action) + + +@not_while_running +@app.route('/calibrate/test/repeat', methods=['POST']) +def calibrate_repeatability_test(): + try: + settings = _parse_calibration_settings(request.form) + mode = request.form.get('mode', 'next') + cycles = _parse_int_setting(request.form, 'cycles', 5, 1, CALIBRATION_REPEATABILITY_MAX_CYCLES) + except ValueError as exc: + return _json_error(str(exc)) + + if mode not in ('start', 'next'): + return _json_error('Invalid repeatability-test mode.') + + def action(): + summary = hw.run_repeatability_test( + mode=mode, + cycles=cycles, + calibration=settings['calibration'], + overrides=settings, + ) + all_passed = summary['success_count'] == cycles and summary['failure_count'] == 0 + message = f"Repeatability test finished: {summary['success_count']}/{cycles} successful runs." + return all_passed, message + + return _run_calibration_action(action) + + +@not_while_running +@app.route('/calibrate/counters/reset', methods=['POST']) +def calibrate_reset_counters(): + lock.acquire() + try: + hw.reset_calibration_counters() + finally: + lock.release() + + payload = _get_calibration_snapshot(message='Calibration counters and reports reset.') + payload['success'] = True + return jsonify(payload) + + def liveGen(): + """Yield multipart MJPEG frames for the browser live-view endpoint.""" while True: with liveoutput.condition: got_frame = liveoutput.condition.wait(timeout=0.1) @@ -339,6 +813,12 @@ def liveStream(): return Response(liveGen(), mimetype='multipart/x-mixed-replace; boundary=frame') +@not_while_running +@app.route('/sensor-status') +def sensor_status(): + return jsonify(hw.get_sensor_status()) + + @app.route('/nightstill.png') def nightStill(): if nightstill.seek(0, io.SEEK_END) == 0: @@ -357,7 +837,7 @@ def dayStill(): @app.route('/lastcapture/.png') def lastCapture(num): - if num < 0 or num > 3: + if num < 0 or num >= experimenter.position_count: return redirect(url_for('static', filename='empty.png')) else: if experimenter.last_captured[num] == '': @@ -373,7 +853,7 @@ def lastCapture(num): @app.route('/preview/.jpg') def preview(num): - if num < 0 or num > 3: + if num < 0 or num >= experimenter.position_count: return redirect(url_for('static', filename='empty.png')) elif experimenter.preview[num] == '': return redirect(url_for('static', filename='empty.png')) @@ -387,13 +867,70 @@ def preview(num): def takePicture(obj): + """Capture a PNG still into an in-memory buffer for exposure previews.""" obj.truncate() obj.seek(0) camera.capture(obj, format="png") obj.seek(0) +@app.route('/live-preview-still.png') +def live_preview_still_image(): + """Serve the most recently captured still-preview image.""" + livepreviewstill_lock.acquire() + try: + if livepreviewstill.seek(0, io.SEEK_END) == 0: + return redirect(url_for('static', filename='empty.png')) + livepreviewstill.seek(0) + return Response(livepreviewstill.read(), mimetype="image/png") + finally: + livepreviewstill_lock.release() + + +@not_while_running +@app.route('/live-preview-still/capture') +def capture_live_preview_still(): + """Capture a high-resolution preview still using experiment settings. + + The route snapshots the current live-view framing, asks the experimenter to + capture a processed still with that framing, stores the PNG in memory for + the browser preview pane, and then reapplies the live controls because the + still capture path may reset them during mode switches. + """ + lock.acquire() + try: + view_state = _get_current_live_view_state() + debug( + "[preview] live state snapshot " + f"zoom(x={view_state['zoom']['x']:.6f}, y={view_state['zoom']['y']:.6f}, roi={view_state['zoom']['roi']:.6f}) " + f"focus={view_state.get('focus')}" + ) + image, daytime = experimenter.capture_processed_still(view_state=view_state, capture_tag='preview') + livepreviewstill_lock.acquire() + try: + livepreviewstill.seek(0) + livepreviewstill.truncate() + image.save(livepreviewstill, format='PNG') + livepreviewstill.seek(0) + finally: + livepreviewstill_lock.release() + image.close() + restored_view = _apply_live_view_state(view_state) + finally: + lock.release() + + return jsonify({ + 'captured': True, + 'mode': 'day' if daytime else 'night', + 'rotated': bool(cfg.get('rotated_camera')), + 'timestamp': int(time.time() * 1000), + 'focus': restored_view['focus'], + 'zoom': restored_view['zoom'], + }) + + def grabExposure(time): + """Capture a day or night exposure sample for the settings page.""" global dayshutter, nightshutter if time in ['day', 'night']: if time == 'day': @@ -412,12 +949,43 @@ def grabExposure(time): @not_while_running @app.route('/focus/') def focus(value): + global current_focus_value value = min(1000, max(10, value)) hw.focusCam(value) - cfg.set('focus', value) + current_focus_value = value return redirect(url_for('index')) +@not_while_running +@app.route('/live/settings/save') +def save_live_settings(): + """Persist the current live-view focus and zoom as defaults for future runs.""" + global current_focus_value + try: + focus_value = int(request.args.get('focus', cfg.get('focus') or 250)) + except (TypeError, ValueError): + abort(400) + + try: + zoom_value = float(request.args.get('zoom', zoomer.roi * 100.0)) / 100.0 + except (TypeError, ValueError): + abort(400) + + focus_value = min(1000, max(10, focus_value)) + zoom_value = min(1.0, max(0.2, zoom_value)) + + hw.focusCam(focus_value) + current_focus_value = focus_value + cfg.set('focus', focus_value) + zoomer.set(roi=zoom_value, persist=True) + + return jsonify({ + 'saved': True, + 'focus': focus_value, + 'zoom': round(zoom_value * 100.0, 1), + }) + + @app.route('/experiment', methods=['GET', 'POST']) def experiment(): if request.method == 'POST': @@ -433,10 +1001,8 @@ def experiment(): else: experimenter.dir = os.path.expanduser('~') setLive('off') # zoomer.set(roi=1.0) - current_zoom_tuple = ( zoomer.y - zoomer.roi/2.0, zoomer.x - zoomer.roi/2.0, zoomer.roi, zoomer.roi) -# current_focus = cfg.get("focus") -# experimenter.custom_settings = {'initial_zoom': current_zoom_tuple, 'initial_focus':current_focus} - experimenter.custom_settings = {'initial_zoom': current_zoom_tuple} + current_zoom_tuple = (zoomer.y - zoomer.roi/2.0, zoomer.x - zoomer.roi/2.0, zoomer.roi, zoomer.roi) + experimenter.custom_settings = {'initial_zoom': current_zoom_tuple, 'initial_focus': current_focus_value} log("Starting new experiment.") experimenter.next_status = 'run' experimenter.status_change.set() @@ -452,16 +1018,19 @@ def experiment(): df = shutil.disk_usage(os.path.expanduser('~')) diskspace = round(df.free / 1024 ** 3, 1) - diskreq = round(experimenter.nshots * 4 * 8 / 1024, 1) + diskreq = round(experimenter.nshots * experimenter.position_count * 8 / 1024, 1) + timezone_name = cfg.get('timezone') return render_template('experiment.html', running=experimenter.running, directory=experimenter.dir, - starttime=time.ctime(experimenter.starttime), delay=experimenter.delay, - endtime=time.ctime(experimenter.endtime), diskspace=diskspace, duration=experimenter.duration, - status=experimenter.status, nshots=experimenter.nshots + 1, diskreq=diskreq, name=cfg.get('name'), + starttime=format_timestamp(experimenter.starttime, timezone_name), delay=experimenter.delay, + endtime=format_timestamp(experimenter.endtime, timezone_name), diskspace=diskspace, duration=experimenter.duration, + status=experimenter.status, nshots=experimenter.nshots + 1, diskreq=diskreq, + position_count=experimenter.position_count, name=cfg.get('name'), defname=experimenter.getDefName()) @not_while_running def exposureMode(time): + """Apply the requested exposure profile to the live camera.""" if time == 'day': camera.iso = cfg.get('dayiso') camera.shutter_speed = 1000000 // cfg.get('dayshutter') @@ -539,15 +1108,33 @@ def exposure(time): @app.route('/calibrate', methods=['GET', 'POST']) def calibrate(): if request.method == 'POST': - value = request.form.get('calibration') - if value: - value = int(value) - value = max(0, min(value, 399)) - cfg.set('calibration', value) - flash("New value for start position: " + str(value)) + try: + settings = _parse_calibration_settings(request.form) + except ValueError as exc: + flash(str(exc)) + else: + _save_calibration_settings(settings) + _disable_calibration_motor_hold() + flash('Calibration settings saved.') exposureMode('auto') setLive('on') - return render_template('calibrate.html', calibration=cfg.get('calibration'), name=cfg.get('name')) + calibration_status = _get_calibration_snapshot() + settings = calibration_status['settings'] + motor_state = calibration_status['motor'] + return render_template( + 'calibrate.html', + calibration=settings['calibration'], + motor_step_delay=settings['motor_step_delay'], + motor_secondary_step_delay=settings['motor_secondary_step_delay'], + secondary_stop_steps=settings['secondary_stop_steps'], + stop_stable_reads=settings['stop_stable_reads'], + secondary_stop_settle_time=settings['secondary_stop_settle_time'], + calibration_status=calibration_status, + name=cfg.get('name'), + motor=motor_state['active'], + calibration_hold_active=motor_state['calibration_hold_active'], + calibration_hold_remaining=motor_state['calibration_hold_remaining'], + ) @not_while_running @@ -580,10 +1167,56 @@ def settings(): if request.method == 'POST': if request.form.get('name'): cfg.set('name', request.form.get('name')) + timezone_name = request.form.get('timezone') + if timezone_name: + try: + cfg.set('timezone', validate_timezone_name(timezone_name)) + flash('Timezone preference saved. Device time remains UTC.') + except ValueError as exc: + flash(str(exc)) ssid, passwd = hostapd.get_ssid() + timezone_name = cfg.get('timezone') + current_time = now_in_timezone(timezone_name) + current_utc_time = now_in_timezone('UTC') + timezone_choices = list(COMMON_TIMEZONES) + if not any(choice[0] == timezone_name for choice in timezone_choices): + timezone_choices.append((timezone_name, timezone_name)) return render_template('settings.html', name=cfg.get('name'), running=experimenter.running, version=cfg.version, debug=cfg.get('debug'), ip_addr=get_external_ip(), hotspot_ready=hostapd.is_ready(), - hotspot_enabled=hostapd.is_enabled(), ssid=ssid, passwd=passwd, rotation=cfg.get('rotated_camera')) + hotspot_enabled=hostapd.is_enabled(), ssid=ssid, passwd=passwd, rotation=cfg.get('rotated_camera'), + timezone_name=timezone_name, timezone_choices=timezone_choices, + current_time=current_time.strftime('%Y-%m-%d %H:%M:%S %Z'), + current_time_utc_iso=current_utc_time.isoformat(), + current_utc_time=current_utc_time.strftime('%Y-%m-%d %H:%M:%S %Z')) + + +@app.route('/sync-time') +def sync_time(): + commands = [ + ['sudo', 'timedatectl', 'set-timezone', 'UTC'], + ['sudo', 'chronyc', '-a', 'makestep'], + ['sudo', 'ntpdate', '-u', 'pool.ntp.org'], + ['sudo', 'sntp', '-sS', 'pool.ntp.org'], + ] + errors = [] + + timezone_result = subprocess.run(commands[0], capture_output=True, text=True) + if timezone_result.returncode != 0: + errors.append(timezone_result.stderr.strip() or timezone_result.stdout.strip()) + + for command in commands[1:]: + result = subprocess.run(command, capture_output=True, text=True) + if result.returncode == 0: + flash('System time synchronized with a public NTP server. Device timezone remains UTC.') + return redirect(url_for('settings')) + errors.append(result.stderr.strip() or result.stdout.strip()) + + details = '; '.join(error for error in errors if error) + if details: + flash('Time sync failed: ' + details) + else: + flash('Time sync failed.') + return redirect(url_for('settings')) @not_while_running @@ -605,7 +1238,7 @@ def file_browser(): for entry in os.scandir(dir): if entry.is_dir() and os.path.dirname(entry.path) == dir and not entry.name.startswith('.') \ - and all(os.path.exists(os.path.join(entry.path, f'plate{i+1}')) for i in range(4)): + and get_plate_dirs(entry.path): du = subprocess.check_output(['/usr/bin/du','-s', entry.path]).split()[0].decode('utf-8') dirs.append((entry.name, round(int(du)/1024**2, 1))) return render_template('filemanager.html', dirs=sorted(dirs), diskspace=diskspace, name=cfg.get('name'), running=experimenter.running) @@ -651,7 +1284,7 @@ def dir_browser(dir): abort(404) files = {} - for plate in ['plate' + str(i+1) for i in range(4)]: + for plate in get_plate_dirs(dir): plate_files = [] for entry in os.scandir(os.path.join(dir, plate)): if entry.is_file() and os.path.dirname(entry.path).startswith(dir) and not entry.name.startswith('.') and \ @@ -727,7 +1360,7 @@ def set_rotated_camera(value): def get_external_ip(): - """returns the IPv4 address of eth0""" + """Return the first IPv4 address found on the primary wired or Wi-Fi NICs.""" for iface in ['eth0', 'wlan0']: p = subprocess.Popen(['/sbin/ip', '-4', '-o', 'a', 'show', iface], stdout=subprocess.PIPE, text=True) data = p.stdout.read() @@ -755,9 +1388,14 @@ def set_hotspot(value): #liveoutput = StillOutput() nightstill = io.BytesIO() daystill = io.BytesIO() +livepreviewstill = io.BytesIO() zoomer = ZoomObject() cfg = Config() +current_focus_value = int(cfg.get('focus') or 250) lock = Lock() +calibration_motor_hold_timer = None +calibration_motor_hold_until = 0.0 +livepreviewstill_lock = Lock() experimenter = None nightshutter = None @@ -769,7 +1407,13 @@ def set_hotspot(value): livestream = False def start(cam, myhw): - global camera, hw, experimenter + """Start the web UI, initialize saved state, and serve requests. + + Startup restores the user's last saved zoom and focus after live view begins + because some camera backends ignore crop/focus writes until streaming has + started and settled. + """ + global camera, hw, experimenter, current_focus_value camera = cam hw = myhw experimenter = Experimenter(hw=hw, cam=cam) @@ -784,7 +1428,9 @@ def start(cam, myhw): camera.rotation = 90 setLive('on') - # restore persisted zoom/pan after stream starts (retry to ensure camera accepts it) + # Restore persisted zoom/pan after streaming starts. Retries are used + # because libcamera sometimes ignores the first crop write during early + # initialization. saved_zoom = cfg.get('zoom') if saved_zoom: try: @@ -814,12 +1460,14 @@ def start(cam, myhw): except Exception as e: debug(f"Failed to restore saved zoom: {e}") - # restore persisted focus (retry while camera finishes initializing) + # Restore persisted focus with retries for the same reason: the lens may + # not accept manual writes until the camera has fully initialized. saved_focus = cfg.get('focus') if saved_focus is not None: log(f"Restoring saved focus: {saved_focus}") for attempt in range(1, 6): hw.focusCam(saved_focus) + current_focus_value = int(saved_focus) time.sleep(1) if getattr(camera, 'type', None) != 'libcamera' or getattr(camera, 'lens_limits', None): log(f"Focus applied (attempt {attempt}).") @@ -833,6 +1481,7 @@ def start(cam, myhw): stop() def stop(): + """Request experiment shutdown and wake the worker thread if needed.""" experimenter.stop() experimenter.quit = True experimenter.status_change.set()