Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 313 additions & 0 deletions api/db_local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
"""
db_local.py — Local SQLite persistence for the metadata DB model.
Zero infrastructure: stores data in a single local .db file.
Used as the backing store for equipment, experiment_types, etc.
during local development (before a real hosted DB is provisioned).
Schema mirrors the metadata DB model from Equipment_Experiment_Metadata_DB_Model.pdf:
- equipment (Stage 0 — registered machines)
- equipment_parameters (hardware limits per machine)
- experiment_types (reusable recipe templates)
- experiment_definitions (specific experiment plans)
"""

import json
import logging
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from datetime import datetime, timezone

logger = logging.getLogger(__name__)

# DB file lives next to this module — committed to .gitignore
DB_PATH = Path(__file__).parent / "local_data" / "metadata.db"


def _ensure_dir():
DB_PATH.parent.mkdir(parents=True, exist_ok=True)


@contextmanager
def _conn():
"""Context manager: yields an open SQLite connection with WAL mode."""
_ensure_dir()
con = sqlite3.connect(DB_PATH, check_same_thread=False)
con.row_factory = sqlite3.Row
con.execute("PRAGMA journal_mode=WAL")
con.execute("PRAGMA foreign_keys=ON")
try:
yield con
con.commit()
except Exception:
con.rollback()
raise
finally:
con.close()


# ─── Schema creation (called once on startup) ────────────────────────────────

def init_db():
"""Create tables if they do not already exist."""
with _conn() as con:
con.executescript("""
CREATE TABLE IF NOT EXISTS equipment (
id TEXT PRIMARY KEY, -- e.g. "etcher_01"
name TEXT NOT NULL,
equipment_type TEXT DEFAULT '',
manufacturer TEXT DEFAULT '',
model TEXT DEFAULT '',
location TEXT DEFAULT '',
serial_number TEXT DEFAULT '',
description TEXT DEFAULT '',
source_type TEXT DEFAULT 'postgresql',
connection_host TEXT DEFAULT '',
connection_port TEXT DEFAULT '5432',
connection_db TEXT DEFAULT '',
owner_id TEXT DEFAULT '',
owner_org TEXT DEFAULT '',
registered_at TEXT NOT NULL,
extra_json TEXT DEFAULT '{}' -- reserved for future fields
);
CREATE TABLE IF NOT EXISTS equipment_parameters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
equipment_id TEXT NOT NULL REFERENCES equipment(id) ON DELETE CASCADE,
name TEXT NOT NULL,
unit TEXT DEFAULT '',
min_value REAL,
max_value REAL,
description TEXT DEFAULT ''
);
CREATE TABLE IF NOT EXISTS experiment_types (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT DEFAULT '',
scientific_objective TEXT DEFAULT '',
owner_id TEXT DEFAULT '',
owner_org TEXT DEFAULT '',
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS experiment_type_parameters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type_id TEXT NOT NULL REFERENCES experiment_types(id) ON DELETE CASCADE,
name TEXT NOT NULL,
unit TEXT DEFAULT '',
is_variable INTEGER DEFAULT 1,
default_value REAL,
min_value REAL,
max_value REAL
);
CREATE TABLE IF NOT EXISTS experiment_definitions (
id TEXT PRIMARY KEY,
equipment_id TEXT REFERENCES equipment(id),
type_id TEXT REFERENCES experiment_types(id),
name TEXT NOT NULL,
planned_date TEXT DEFAULT '',
owner_id TEXT DEFAULT '',
owner_org TEXT DEFAULT '',
created_at TEXT NOT NULL,
planned_params_json TEXT DEFAULT '[]'
);
""")
logger.info(f"Local SQLite DB ready: {DB_PATH}")


# ─── Equipment ───────────────────────────────────────────────────────────────

def insert_equipment(payload: dict) -> str:
"""Insert a new equipment record. Returns the equipment id."""
eq_id = payload.get("domain_id") or payload["name"].lower().replace(" ", "_")
now = datetime.now(timezone.utc).isoformat()

with _conn() as con:
con.execute("""
INSERT OR REPLACE INTO equipment
(id, name, equipment_type, manufacturer, model, location,
serial_number, description, source_type, connection_host,
connection_port, connection_db, owner_id, owner_org, registered_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
eq_id,
payload.get("name", ""),
payload.get("equipment_type", ""),
payload.get("manufacturer", ""),
payload.get("model", ""),
payload.get("location", ""),
payload.get("serial_number", ""),
payload.get("description", ""),
payload.get("source_type", "postgresql"),
payload.get("connection_host", ""),
payload.get("connection_port", "5432"),
payload.get("connection_database", ""),
payload.get("owner_id", ""),
payload.get("owner_org", ""),
now,
))

# Insert hardware parameters
params = payload.get("parameters", [])
for p in params:
if not p.get("name"):
continue
con.execute("""
INSERT INTO equipment_parameters
(equipment_id, name, unit, min_value, max_value, description)
VALUES (?,?,?,?,?,?)
""", (
eq_id,
p.get("name", ""),
p.get("unit", ""),
_to_float(p.get("min_value")),
_to_float(p.get("max_value")),
p.get("description", ""),
))

return eq_id


def list_equipment(owner_org: str = "", role: str = "") -> list[dict]:
"""
Return all registered equipment.
Admins see everything; others see only their own org.
"""
with _conn() as con:
if role == "admin" or not owner_org:
rows = con.execute("SELECT * FROM equipment ORDER BY registered_at DESC").fetchall()
else:
rows = con.execute(
"SELECT * FROM equipment WHERE owner_org = ? ORDER BY registered_at DESC",
(owner_org,)
).fetchall()

result = []
for row in rows:
d = dict(row)
# Fetch parameters for this equipment
params = con.execute(
"SELECT * FROM equipment_parameters WHERE equipment_id = ?",
(row["id"],)
).fetchall()
d["parameters"] = [dict(p) for p in params]
result.append(d)
return result


def get_equipment(eq_id: str) -> dict | None:
"""Return a single equipment record by id."""
with _conn() as con:
row = con.execute("SELECT * FROM equipment WHERE id = ?", (eq_id,)).fetchone()
if row is None:
return None
d = dict(row)
params = con.execute(
"SELECT * FROM equipment_parameters WHERE equipment_id = ?", (eq_id,)
).fetchall()
d["parameters"] = [dict(p) for p in params]
return d


# ─── Experiment Types ─────────────────────────────────────────────────────────

def insert_experiment_type(payload: dict) -> str:
"""Insert or replace an experiment type. Returns the type id."""
type_id = payload.get("id") or payload["type_name"].lower().replace(" ", "_")
now = datetime.now(timezone.utc).isoformat()

with _conn() as con:
con.execute("""
INSERT OR REPLACE INTO experiment_types
(id, name, description, scientific_objective, owner_id, owner_org, created_at)
VALUES (?,?,?,?,?,?,?)
""", (
type_id,
payload.get("type_name", ""),
payload.get("type_description", ""),
payload.get("scientific_objective", ""),
payload.get("owner_id", ""),
payload.get("owner_org", ""),
now,
))

for p in payload.get("type_parameters", []):
if not p.get("name"):
continue
con.execute("""
INSERT INTO experiment_type_parameters
(type_id, name, unit, is_variable, default_value, min_value, max_value)
VALUES (?,?,?,?,?,?,?)
""", (
type_id,
p.get("name", ""),
p.get("unit", ""),
1 if p.get("is_variable") else 0,
_to_float(p.get("default_value")),
_to_float(p.get("min_value")),
_to_float(p.get("max_value")),
))

return type_id


def list_experiment_types(owner_org: str = "", role: str = "") -> list[dict]:
with _conn() as con:
if role == "admin" or not owner_org:
rows = con.execute("SELECT * FROM experiment_types ORDER BY created_at DESC").fetchall()
else:
rows = con.execute(
"SELECT * FROM experiment_types WHERE owner_org = ? ORDER BY created_at DESC",
(owner_org,)
).fetchall()

result = []
for row in rows:
d = dict(row)
params = con.execute(
"SELECT * FROM experiment_type_parameters WHERE type_id = ?", (row["id"],)
).fetchall()
d["parameters"] = [dict(p) for p in params]
result.append(d)
return result


# ─── Experiment Definitions ───────────────────────────────────────────────────

def insert_experiment_definition(payload: dict) -> str:
"""Insert an experiment definition. Returns its id."""
import uuid
def_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()

with _conn() as con:
con.execute("""
INSERT INTO experiment_definitions
(id, equipment_id, type_id, name, planned_date,
owner_id, owner_org, created_at, planned_params_json)
VALUES (?,?,?,?,?,?,?,?,?)
""", (
def_id,
payload.get("selected_equipment", ""),
payload.get("type_id", ""),
payload.get("experiment_name", ""),
payload.get("planned_date", ""),
payload.get("owner_id", ""),
payload.get("owner_org", ""),
now,
json.dumps(payload.get("planned_parameters", [])),
))

return def_id


# ─── Utility ─────────────────────────────────────────────────────────────────

def _to_float(v) -> float | None:
try:
return float(v) if v not in (None, "", "None") else None
except (TypeError, ValueError):
return None
Loading