From b0ba2c9ccd800c6c5c9aa0d143ecec27f2f5d4f8 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 13:56:56 -0500 Subject: [PATCH 01/23] Make data upload work for all registered equipment Generalize the Upload section beyond the canonical etcher: - Add generic equipment_runs table (JSONB inputs/outputs) with RLS mirroring etcher_runs, plus runtime ensure migration and SQL scripts. - Route Process: etcher -> sync_runs_pg (unchanged); any other equipment -> sync_equipment_runs_pg with a generic CSV->rows builder driven by the equipment config. - Surface generic runs in /runs list and detail via a disjoint run_id offset so the existing Data Catalog renders them with no frontend rewrite. - Template generator now includes registered outputs/parameters, not just features/targets. - Fix the previously dead min/max validation (read features id/min, parameters min_value, targets.constraints) and make out-of-range a non-blocking warning. - Upload page wording is equipment-generic and shows range warnings on success. Co-authored-by: Cursor --- api/data_loader_pg.py | 278 +++++++++++++++++- api/main.py | 2 + api/routers/upload.py | 443 +++++++++++++++++++++++------ api/scripts/add_equipment_runs.sql | 61 ++++ geddes/k8s/postgres/schema_v2.sql | 48 ++++ web/app/data/upload/page.tsx | 17 +- 6 files changed, 751 insertions(+), 98 deletions(-) create mode 100644 api/scripts/add_equipment_runs.sql diff --git a/api/data_loader_pg.py b/api/data_loader_pg.py index 09aaa1f..551a265 100644 --- a/api/data_loader_pg.py +++ b/api/data_loader_pg.py @@ -17,6 +17,14 @@ logger = logging.getLogger(__name__) +# Generic equipment_runs rows are surfaced through the same /runs endpoints as +# etcher_runs. To keep run_id values unambiguous across the two physical tables +# we expose generic rows with their primary key shifted into a disjoint numeric +# space. etcher idruns are 32-bit INTEGERs (and manual etcher uploads use the +# 1.0e9–1.9e9 band), so an offset well above that range cannot collide. +EQUIPMENT_RUN_ID_OFFSET = 5_000_000_000 + + def is_mock_db_enabled() -> bool: return os.getenv("USE_MOCK_DATABASE", "false").lower() == "true" @@ -267,6 +275,214 @@ def ensure_etcher_run_file_refs_pg() -> None: conn.close() +def ensure_equipment_runs_pg() -> None: + """Create the generic equipment_runs table, RLS policy, and grants. + + Idempotent — safe to call on every startup. Mirrors the project-membership + visibility model used by ``etcher_runs`` so generic runs respect the same + open/shared/private project access rules. + """ + conn = get_pg_superuser_connection() + try: + with conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS equipment_runs ( + id BIGSERIAL PRIMARY KEY, + equipment_id VARCHAR(255) NOT NULL, + project_id VARCHAR(50) REFERENCES projects(id) ON DELETE SET NULL, + lotname VARCHAR(255), + run_date TIMESTAMP WITH TIME ZONE, + is_outlier BOOLEAN DEFAULT false, + is_calibration_recipe BOOLEAN DEFAULT false, + outlier_type TEXT DEFAULT '', + inputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + outputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + raw_payload_json JSONB NOT NULL DEFAULT '{}'::jsonb, + upload_id UUID, + upload_filename VARCHAR(255) DEFAULT '', + source VARCHAR(100) DEFAULT 'data_upload', + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_equipment_runs_equipment_id ON equipment_runs(equipment_id)" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_equipment_runs_project_id ON equipment_runs(project_id)" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_equipment_runs_upload_id ON equipment_runs(upload_id)" + ) + cur.execute("ALTER TABLE equipment_runs ENABLE ROW LEVEL SECURITY") + cur.execute("GRANT SELECT ON equipment_runs TO api_client") + cur.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM pg_policies + WHERE schemaname = 'public' + AND tablename = 'equipment_runs' + AND policyname = 'equipment_run_visibility_policy' + ) THEN + CREATE POLICY equipment_run_visibility_policy ON equipment_runs + FOR SELECT + USING ( + EXISTS ( + SELECT 1 + FROM projects p + WHERE p.id = equipment_runs.project_id + AND ( + p.access_mode = 'open' + OR EXISTS ( + SELECT 1 + FROM project_members pm + WHERE pm.project_id = p.id + AND pm.nanohub_user_id = current_setting('app.current_user', true) + ) + ) + ) + ); + END IF; + END + $$; + """ + ) + conn.commit() + finally: + conn.close() + + +def _equipment_runs_base_select(*, include_raw_payload: bool = False) -> str: + """SELECT that shapes generic equipment_runs into the same record format + used by the etcher /runs endpoints (so the catalog can render both).""" + raw_payload_select = ",\n r.raw_payload_json" if include_raw_payload else "" + return f""" + SELECT + (r.id + {EQUIPMENT_RUN_ID_OFFSET}) AS run_id, + r.lotname AS lot_name, + r.run_date, + r.created_at, + r.is_outlier, + r.is_calibration_recipe AS is_calibration, + r.outlier_type, + r.inputs_json, + r.outputs_json, + r.equipment_id AS run_equipment_id, + r.upload_filename, + r.source, + r.project_id, + ''::text AS execution_request_id, + p.name AS project_name, + COALESCE(p.equipment_id, r.equipment_id) AS equipment_id, + p.equipment_name AS equipment_name, + p.pi_name AS pi_name, + p.access_mode AS project_access + {raw_payload_select} + FROM equipment_runs r + LEFT JOIN projects p ON p.id = r.project_id +""" + + +def _shape_equipment_run_record(row) -> Dict[str, Any]: + """Shape a raw equipment_runs row into the API response format used by the + v2 endpoints, mirroring _shape_run_record's contract.""" + rec = dict(row) + for ts_field in ("run_date", "created_at"): + if rec.get(ts_field): + rec[ts_field] = ( + rec[ts_field].isoformat() + if hasattr(rec[ts_field], "isoformat") + else str(rec[ts_field]) + ) + # Expose the validated input set-points under the same "features" key the + # etcher rows use, and surface measured outputs alongside. + rec["features"] = rec.pop("inputs_json", {}) or {} + rec["outputs"] = rec.pop("outputs_json", {}) or {} + # The catalog renders etch-rate badges only when these are numeric; generic + # equipment has no canonical etch-rate, so leave them absent. + rec.setdefault("avg_etch_rate", None) + rec.setdefault("range_etch_rate", None) + rec.setdefault("range_nm", None) + rec["equipment_name"] = rec.get("equipment_name") or rec.get("run_equipment_id") or "" + rec.pop("run_equipment_id", None) + raw_payload = rec.pop("raw_payload_json", None) + if raw_payload is not None: + rec["raw_payload"] = raw_payload + rec["file_refs"] = [] + return rec + + +def sync_equipment_runs_pg( + *, + equipment_id: str, + rows: List[Dict[str, Any]], + upload_id: Optional[str] = None, + upload_filename: str = "", +) -> int: + """Insert validated rows for an arbitrary registered equipment into the + generic equipment_runs table. + + Each row dict is expected to contain: project_id, lotname, run_date, + inputs (dict), outputs (dict), raw (dict), and optional is_outlier / + is_calibration_recipe / outlier_type flags. + + When ``upload_id`` is provided, any previously ingested rows for that upload + are deleted first so re-processing the same upload is idempotent (it + replaces rather than duplicates). + """ + from psycopg2.extras import execute_values + + if not rows: + return 0 + + conn = get_pg_superuser_connection() + try: + insert_rows = [] + for row in rows: + insert_rows.append(( + equipment_id, + row.get("project_id"), + row.get("lotname"), + row.get("run_date") or None, + bool(row.get("is_outlier", False)), + bool(row.get("is_calibration_recipe", False)), + str(row.get("outlier_type", "") or ""), + Json(_json_safe_payload(row.get("inputs", {}) or {})), + Json(_json_safe_payload(row.get("outputs", {}) or {})), + Json(_json_safe_payload(row.get("raw", {}) or {})), + upload_id, + upload_filename, + str(row.get("source", "data_upload") or "data_upload"), + )) + + with conn.cursor() as cur: + if upload_id is not None: + cur.execute( + "DELETE FROM equipment_runs WHERE upload_id = %s", + (upload_id,), + ) + execute_values( + cur, + """ + INSERT INTO equipment_runs ( + equipment_id, project_id, lotname, run_date, + is_outlier, is_calibration_recipe, outlier_type, + inputs_json, outputs_json, raw_payload_json, + upload_id, upload_filename, source + ) VALUES %s + """, + insert_rows, + ) + conn.commit() + return len(insert_rows) + finally: + conn.close() + + def _first_text(row: Dict[str, Any], *keys: str) -> str: for key in keys: value = row.get(key) @@ -408,28 +624,48 @@ def get_runs_list_pg( else: conn = get_pg_connection(nanohub_user_id) - query = _run_base_select(include_raw_payload=include_raw_payload) + # Build the shared WHERE clause for both physical run tables. The + # column names line up (is_outlier, is_calibration_recipe, project_id, + # run_date) so the same predicates apply to etcher_runs and the generic + # equipment_runs table. conditions: list[str] = [] - params: list[Any] = [] - + base_params: list[Any] = [] if not include_outliers: conditions.append("r.is_outlier = false AND r.is_calibration_recipe = false") if project_id: conditions.append("r.project_id = %s") - params.append(project_id) + base_params.append(project_id) + where_clause = (" WHERE " + " AND ".join(conditions)) if conditions else "" - if conditions: - query += " WHERE " + " AND ".join(conditions) + # Over-fetch (limit + offset) from each source so a merged, globally + # date-sorted page is still correct after slicing in Python. + page_cap = max(0, limit) + max(0, offset) - query += " ORDER BY r.run_date DESC NULLS LAST LIMIT %s OFFSET %s" - params.extend([limit, offset]) + etcher_query = ( + _run_base_select(include_raw_payload=include_raw_payload) + + where_clause + + " ORDER BY r.run_date DESC NULLS LAST LIMIT %s" + ) + generic_query = ( + _equipment_runs_base_select(include_raw_payload=include_raw_payload) + + where_clause + + " ORDER BY r.run_date DESC NULLS LAST LIMIT %s" + ) with conn.cursor(cursor_factory=RealDictCursor) as cur: - cur.execute(query, tuple(params)) - rows = cur.fetchall() + cur.execute(etcher_query, tuple(base_params + [page_cap])) + etcher_rows = cur.fetchall() + cur.execute(generic_query, tuple(base_params + [page_cap])) + generic_rows = cur.fetchall() - records = [_shape_run_record(row) for row in rows] + records = [_shape_run_record(row) for row in etcher_rows] _attach_run_file_refs(conn, records) + records.extend(_shape_equipment_run_record(row) for row in generic_rows) + + # Merge the two sources into a single date-descending page. Rows without + # a run_date sort last, matching the per-source "NULLS LAST" ordering. + records.sort(key=lambda rec: (rec.get("run_date") or ""), reverse=True) + records = records[offset:offset + limit] if limit else records[offset:] conn.commit() conn.close() @@ -467,6 +703,26 @@ def get_run_detail_pg( else: conn = get_pg_connection(nanohub_user_id) + # run_id values at/above the offset belong to the generic + # equipment_runs table (see EQUIPMENT_RUN_ID_OFFSET); everything below + # is a canonical etcher_runs.idruns. + if run_id >= EQUIPMENT_RUN_ID_OFFSET: + query = ( + _equipment_runs_base_select(include_raw_payload=True) + + " WHERE r.id = %s LIMIT 1" + ) + with conn.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute(query, (run_id - EQUIPMENT_RUN_ID_OFFSET,)) + row = cur.fetchone() + if not row: + conn.commit() + conn.close() + return None + record = _shape_equipment_run_record(row) + conn.commit() + conn.close() + return record + query = _run_base_select(include_raw_payload=True) + " WHERE r.idruns = %s LIMIT 1" with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(query, (run_id,)) diff --git a/api/main.py b/api/main.py index 0578adb..7b54652 100644 --- a/api/main.py +++ b/api/main.py @@ -141,6 +141,7 @@ def _run_startup_migrations() -> None: """ from data_loader_pg import get_pg_superuser_connection from data_loader_pg import ensure_etcher_run_file_refs_pg + from data_loader_pg import ensure_equipment_runs_pg from metadata_pg import ( ensure_experiment_proposal_generation_columns_pg, ensure_experiment_type_reuse_columns_pg, @@ -165,6 +166,7 @@ def _run_startup_migrations() -> None: lock_conn.commit() ensure_etcher_run_file_refs_pg() + ensure_equipment_runs_pg() ensure_experiment_proposal_generation_columns_pg() ensure_experiment_type_reuse_columns_pg() ensure_experiment_type_versioning_columns_pg() diff --git a/api/routers/upload.py b/api/routers/upload.py index 189ceb9..225bc66 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -16,7 +16,12 @@ from pydantic import BaseModel from domain_configs import derive_expected_columns, load_domain_config -from data_loader_pg import get_pg_connection, get_projects_list_pg, sync_runs_pg +from data_loader_pg import ( + get_pg_connection, + get_projects_list_pg, + sync_equipment_runs_pg, + sync_runs_pg, +) from metadata_pg import ( approved_equipment_available_to_user, create_upload_record_pg, @@ -98,6 +103,82 @@ def _normalize_column_type(value: Any) -> str: return "" +def _coerce_optional_float(value: Any) -> float | None: + if value is None: + return None + text = str(value).strip() + if text == "" or text.lower() == "none": + return None + try: + return float(text) + except (TypeError, ValueError): + return None + + +def _extract_expected_bounds( + config_json: dict[str, Any], +) -> dict[str, tuple[float | None, float | None]]: + """Collect per-column min/max limits from an equipment config. + + Reads the actual shapes used by both PG-registered and legacy JSON configs: + - features: keyed by ``id`` (or ``name``), limits in ``min``/``max`` or + ``min_value``/``max_value`` + - parameters: keyed by ``name`` (or ``id``), limits in ``min_value``/ + ``max_value`` or ``min``/``max`` + - targets: ``targets.primary``/``targets.secondary`` keyed by ``id``, + limits inside a ``constraints`` object + - legacy flat ``primary_target``/``secondary_target`` objects + """ + bounds: dict[str, tuple[float | None, float | None]] = {} + + def consider(name: Any, min_raw: Any, max_raw: Any) -> None: + if not name: + return + min_v = _coerce_optional_float(min_raw) + max_v = _coerce_optional_float(max_raw) + if min_v is not None or max_v is not None: + bounds[str(name)] = (min_v, max_v) + + if not isinstance(config_json, dict): + return bounds + + for feature in config_json.get("features", []): + if isinstance(feature, dict): + consider( + feature.get("id") or feature.get("name"), + feature.get("min", feature.get("min_value")), + feature.get("max", feature.get("max_value")), + ) + + for parameter in config_json.get("parameters", []): + if isinstance(parameter, dict): + consider( + parameter.get("name") or parameter.get("id"), + parameter.get("min_value", parameter.get("min")), + parameter.get("max_value", parameter.get("max")), + ) + + targets = config_json.get("targets", {}) + target_objs: list[Any] = [] + if isinstance(targets, dict): + target_objs.extend([targets.get("primary"), targets.get("secondary")]) + target_objs.extend( + [config_json.get("primary_target"), config_json.get("secondary_target")] + ) + for target in target_objs: + if not isinstance(target, dict): + continue + constraints = target.get("constraints") + constraints = constraints if isinstance(constraints, dict) else {} + consider( + target.get("id") or target.get("name"), + constraints.get("min", target.get("min_value")), + constraints.get("max", target.get("max_value")), + ) + + return bounds + + def _upload_value(row: dict[str, Any], canonical: str) -> Any: for key in UPLOAD_TO_SYNC_ALIASES.get(canonical, (canonical,)): if key in row and str(row.get(key) or "").strip() != "": @@ -330,6 +411,21 @@ def _template_columns_and_value_types( if unit: unit_by_name.setdefault(column_name, unit) + # Registered input parameters and output measurements also carry units that + # should drive the template's value-type hints. + for collection in (config.get("parameters", []), config.get("outputs", [])): + for item in collection: + if not isinstance(item, dict): + continue + name = item.get("name") or item.get("id") + if not name: + continue + unit = str(item.get("unit") or "").strip() + if not unit: + unit = _qudt_unit_label(item.get("qudt_unit")) + if unit: + unit_by_name.setdefault(str(name), unit) + targets = config.get("targets", {}) if isinstance(targets, dict): for target in (targets.get("primary"), targets.get("secondary")): @@ -361,7 +457,7 @@ def infer_value_type(name: str, *, role: str = "") -> str: if unit: return unit - if role in {"feature", "target"}: + if role in {"feature", "target", "output", "parameter"}: return "float" return type_by_name.get(name, "") or "string" @@ -375,9 +471,38 @@ def add(name: Any, *, role: str = "") -> None: columns.append(column_name) value_types.append(infer_value_type(column_name, role=role)) + def add_parameters() -> None: + for parameter in config.get("parameters", []): + if isinstance(parameter, dict): + add(parameter.get("name") or parameter.get("id"), role="parameter") + else: + add(parameter, role="parameter") + + def add_outputs() -> None: + """Append the equipment's measured outputs so users have explicit + columns to report results (etch rate, uniformity, …) alongside the + input set-points. Covers both the optimization targets and the richer + ``outputs`` list captured at registration.""" + targets = config.get("targets", {}) + if isinstance(targets, dict): + for target in (targets.get("primary"), targets.get("secondary")): + if isinstance(target, dict): + add(target.get("id") or target.get("name"), role="target") + else: + add(target, role="target") + for output in config.get("outputs", []): + if isinstance(output, dict): + add(output.get("name") or output.get("id"), role="output") + else: + add(output, role="output") + + # When the equipment declares an explicit ingestion column list we honor it + # as the canonical input schema, but still append any registered outputs + # that aren't already present so results have somewhere to go. if explicit_columns: for column in explicit_columns: add(column["name"]) + add_outputs() return columns, value_types add(lot_col, role="lot") @@ -389,13 +514,8 @@ def add(name: Any, *, role: str = "") -> None: else: add(feature, role="feature") - targets = config.get("targets", {}) - if isinstance(targets, dict): - for target in (targets.get("primary"), targets.get("secondary")): - if isinstance(target, dict): - add(target.get("id") or target.get("name"), role="target") - else: - add(target, role="target") + add_parameters() + add_outputs() if not columns: derived_columns = derive_expected_columns(config) @@ -459,29 +579,11 @@ async def upload_file( ) expected_columns = derive_expected_columns(legacy_config) - # Extract numerical boundary limits for validation - expected_bounds = {} + # Extract numerical boundary limits for validation. These come from the + # equipment's registered features/parameters/targets and are used to flag + # (non-blocking) out-of-range values that usually indicate a unit mismatch. config_json = equipment.get("config_json", {}) if equipment else legacy_config - for f in config_json.get("features", []): - if isinstance(f, dict) and f.get("name"): - try: - min_v = float(f.get("min_value")) if f.get("min_value") else None - max_v = float(f.get("max_value")) if f.get("max_value") else None - if min_v is not None or max_v is not None: - expected_bounds[f["name"]] = (min_v, max_v) - except (ValueError, TypeError): - pass - - for t_key in ["primary_target", "secondary_target"]: - t = config_json.get(t_key) - if isinstance(t, dict) and t.get("name"): - try: - min_v = float(t.get("min_value")) if t.get("min_value") else None - max_v = float(t.get("max_value")) if t.get("max_value") else None - if min_v is not None or max_v is not None: - expected_bounds[t["name"]] = (min_v, max_v) - except (ValueError, TypeError): - pass + expected_bounds = _extract_expected_bounds(config_json) # Check extension ext = Path(file.filename).suffix.lower() @@ -501,8 +603,14 @@ async def upload_file( if size_bytes > 50 * 1024 * 1024: # 50 MB limit raise HTTPException(status_code=400, detail="File exceeds 50 MB limit") - # Parse CSV to validate basic structure - errors = [] + # Parse CSV to validate basic structure. + # + # Hard errors (missing required columns, malformed rows) block processing. + # Out-of-range values are recorded as non-blocking warnings: they usually + # mean a unit mismatch worth flagging, but they should not stop a researcher + # from ingesting otherwise well-formed data. + errors: list[str] = [] + warnings: list[str] = [] row_count = 0 columns = [] @@ -530,22 +638,24 @@ async def upload_file( errors.append("... (more errors truncated)") break continue - + for col_idx, col_name in enumerate(columns): if col_name in expected_bounds: min_v, max_v = expected_bounds[col_name] if col_idx >= len(row): continue val_str = row[col_idx].strip() if not val_str: continue - try: - val = float(val_str) + val = _coerce_optional_float(val_str) + if val is None: + continue + if len(warnings) <= 10: if min_v is not None and val < min_v: - errors.append(f"Row {i} Validation Warning: '{col_name}' value {val} is below minimum ({min_v}). Check units.") + warnings.append(f"Row {i} Validation Warning: '{col_name}' value {val} is below minimum ({min_v}). Check units.") if max_v is not None and val > max_v: - errors.append(f"Row {i} Validation Warning: '{col_name}' value {val} exceeds maximum ({max_v}). Check units.") - except Exception: - pass - + warnings.append(f"Row {i} Validation Warning: '{col_name}' value {val} exceeds maximum ({max_v}). Check units.") + if len(warnings) > 10: + warnings.append("... (more warnings truncated)") + if len(errors) > 10 and not errors[-1].startswith("..."): errors.append("... (more errors truncated)") break @@ -570,13 +680,17 @@ async def upload_file( with open(dest, "wb") as f: f.write(contents) + # Only hard errors drive failure; warnings are surfaced but stored + # alongside so the upload still reaches a processable "success" state. status = "error" if errors else "success" + stored_issues = errors + warnings if ext == ".csv": - message = ( - f"File '{file.filename}' was stored after CSV header and row-shape validation." - if not errors - else f"File '{file.filename}' was stored, but CSV structural validation found {len(errors)} issue(s)." - ) + if errors: + message = f"File '{file.filename}' was stored, but CSV structural validation found {len(errors)} issue(s)." + elif warnings: + message = f"File '{file.filename}' was stored after validation with {len(warnings)} range warning(s) to review." + else: + message = f"File '{file.filename}' was stored after CSV header and row-shape validation." else: message = ( f"Excel file '{file.filename}' was stored. Detailed spreadsheet parsing and automatic FAIR/AI ingestion are not performed by this endpoint." @@ -590,7 +704,7 @@ async def upload_file( size_bytes=size_bytes, row_count=row_count, columns=columns, - errors=errors, + errors=stored_issues, status=status, user=user, ) @@ -613,7 +727,7 @@ async def upload_file( "size_bytes": size_bytes, "row_count": row_count, "columns": columns, - "errors": errors if errors else None, + "errors": stored_issues if stored_issues else None, "message": message, "processing_status": "stored_only", } @@ -627,6 +741,157 @@ def list_uploads( return list_uploads_pg(user) +def _load_equipment_config(equipment_id: str) -> dict[str, Any]: + """Return the config_json for a registered equipment, falling back to a + legacy JSON domain config. Returns {} if neither is found.""" + equipment = get_equipment_pg(equipment_id) + if equipment: + config = equipment.get("config_json", {}) + return config if isinstance(config, dict) else {} + legacy = load_domain_config(equipment_id) + return legacy if isinstance(legacy, dict) else {} + + +def _config_input_output_names( + config: dict[str, Any], +) -> tuple[list[str], list[str]]: + """Split an equipment config's columns into input set-points and measured + outputs. Outputs are authoritative (targets + registered outputs); every + other declared column that isn't the lot/timestamp key is treated as input. + """ + data_config = config.get("data", {}) + if not isinstance(data_config, dict): + data_config = {} + lot_col = data_config.get("lotname_column") or "LOTNAME" + ts_col = data_config.get("timestamp_column") or "run_date" + + output_names: list[str] = [] + seen_out: set[str] = set() + + def add_out(name: Any) -> None: + if not name: + return + text = str(name) + if text not in seen_out: + seen_out.add(text) + output_names.append(text) + + targets = config.get("targets", {}) + if isinstance(targets, dict): + for target in (targets.get("primary"), targets.get("secondary")): + if isinstance(target, dict): + add_out(target.get("id") or target.get("name")) + for output in config.get("outputs", []): + if isinstance(output, dict): + add_out(output.get("name") or output.get("id")) + else: + add_out(output) + + input_names: list[str] = [] + seen_in: set[str] = set() + + def add_in(name: Any) -> None: + if not name: + return + text = str(name) + if text in seen_out or text in {lot_col, ts_col} or text in seen_in: + return + seen_in.add(text) + input_names.append(text) + + for feature in config.get("features", []): + if isinstance(feature, dict): + add_in(feature.get("id") or feature.get("name")) + else: + add_in(feature) + for parameter in config.get("parameters", []): + if isinstance(parameter, dict): + add_in(parameter.get("name") or parameter.get("id")) + else: + add_in(parameter) + for column in config.get("columns", []): + if isinstance(column, dict): + add_in(column.get("name")) + + return input_names, output_names + + +def _coerce_cell(value: Any) -> Any: + """Numeric where possible, otherwise the trimmed string (or None).""" + if value is None: + return None + text = str(value).strip() + if text == "": + return None + parsed = _coerce_optional_float(text) + return parsed if parsed is not None else text + + +def _csv_upload_to_equipment_rows( + *, + config: dict[str, Any], + upload_record: dict[str, Any], + project_id: str, +) -> tuple[list[dict[str, Any]], list[str]]: + """Generic CSV → equipment_runs rows builder for any registered equipment. + + Splits each row into the equipment's declared inputs and outputs and keeps + the full original row as the raw payload. Enforces the equipment's required + columns (lot/timestamp/features/targets) before ingestion. + """ + path = Path(upload_record["storage_path"]) + if path.suffix.lower() != ".csv": + return [], ["Only CSV uploads can be processed into the data catalog today."] + if not path.exists(): + return [], ["Stored upload file is missing from disk."] + + data_config = config.get("data", {}) + if not isinstance(data_config, dict): + data_config = {} + lot_col = data_config.get("lotname_column") or "LOTNAME" + ts_col = data_config.get("timestamp_column") or "run_date" + + input_names, output_names = _config_input_output_names(config) + required_columns = derive_expected_columns(config) + + errors: list[str] = [] + rows: list[dict[str, Any]] = [] + with path.open("r", encoding="utf-8-sig", newline="") as f: + reader = csv.DictReader(f) + headers = set(reader.fieldnames or []) + missing = [column for column in required_columns if column not in headers] + if missing: + errors.append( + "Missing required columns: " + ", ".join(sorted(missing)) + ) + return [], errors + + for raw_row in reader: + inputs = { + name: _coerce_cell(raw_row.get(name)) + for name in input_names + if name in raw_row + } + outputs = { + name: _coerce_cell(raw_row.get(name)) + for name in output_names + if name in raw_row + } + rows.append( + { + "project_id": project_id, + "lotname": str(raw_row.get(lot_col) or "").strip(), + "run_date": str(raw_row.get(ts_col) or "").strip() or None, + "inputs": inputs, + "outputs": outputs, + "raw": dict(raw_row), + "source": "data_upload", + } + ) + + return rows, errors + + @router.post("/uploads/{upload_id}/process") def process_upload( upload_id: str, @@ -634,11 +899,13 @@ def process_upload( user: PlatformUser = Depends(get_platform_user), ): """ - Convert a stored CSV upload into canonical etcher_runs rows. + Convert a stored CSV upload into canonical run records. - This intentionally reuses sync_runs_pg(), the same ingestion sink used by - Azure, so upsert behavior, project linkage, raw payload storage, and - proposal reconciliation stay consistent across ingestion sources. + The canonical etcher routes rows through sync_runs_pg() — the same ingestion + sink used by Azure — so upsert behavior, project linkage, raw payload + storage, and proposal reconciliation stay consistent. Every other registered + equipment is ingested into the generic equipment_runs table so manually + uploaded data is captured and surfaced in the catalog regardless of type. """ project_id = payload.project_id.strip() if not project_id: @@ -646,57 +913,67 @@ def process_upload( _assert_project_visible(project_id, user) upload_record = _load_upload_record(upload_id, user) + equipment_id = upload_record["equipment_id"] _update_upload_record(upload_id=upload_id, status="processing", errors=[]) - rows, errors = _csv_upload_to_sync_rows( - upload_id=upload_id, - upload_record=upload_record, - project_id=project_id, - ) - if errors: + def _fail(errors: list[str], message: str, row_count: int) -> dict[str, Any]: _update_upload_record( upload_id=upload_id, status="failed", errors=errors[:50], - row_count=len(rows), + row_count=row_count, ) return { "status": "failed", "upload_id": upload_id, - "equipment_id": upload_record["equipment_id"], + "equipment_id": equipment_id, "project_id": project_id, "inserted": 0, - "row_count": len(rows), + "row_count": row_count, "errors": errors[:50], - "message": "Upload was not processed because validation failed.", + "message": message, } - if not rows: - errors = ["No data rows were found in the CSV upload."] - _update_upload_record( + + is_canonical_etcher = equipment_id == "etcher" + + if is_canonical_etcher: + rows, errors = _csv_upload_to_sync_rows( upload_id=upload_id, - status="failed", - errors=errors, - row_count=0, + upload_record=upload_record, + project_id=project_id, + ) + else: + config = _load_equipment_config(equipment_id) + rows, errors = _csv_upload_to_equipment_rows( + config=config, + upload_record=upload_record, + project_id=project_id, + ) + + if errors: + return _fail(errors, "Upload was not processed because validation failed.", len(rows)) + if not rows: + return _fail( + ["No data rows were found in the CSV upload."], + "Upload was not processed because it contained no rows.", + 0, ) - return { - "status": "failed", - "upload_id": upload_id, - "equipment_id": upload_record["equipment_id"], - "project_id": project_id, - "inserted": 0, - "row_count": 0, - "errors": errors, - "message": "Upload was not processed because it contained no rows.", - } try: - inserted = sync_runs_pg(rows) + if is_canonical_etcher: + inserted = sync_runs_pg(rows) + else: + inserted = sync_equipment_runs_pg( + equipment_id=equipment_id, + rows=rows, + upload_id=upload_id, + upload_filename=upload_record["filename"], + ) except Exception as exc: - errors = [f"Database sync failed: {exc}"] _update_upload_record( upload_id=upload_id, status="failed", - errors=errors, + errors=[f"Database sync failed: {exc}"], row_count=len(rows), ) raise HTTPException(status_code=500, detail="Upload processing sync failed") from exc @@ -710,12 +987,12 @@ def process_upload( return { "status": "processed", "upload_id": upload_id, - "equipment_id": upload_record["equipment_id"], + "equipment_id": equipment_id, "project_id": project_id, "inserted": inserted, "row_count": len(rows), "errors": [], - "message": f"Processed {inserted} row(s) into etcher_runs.", + "message": f"Processed {inserted} row(s) into the data catalog.", } diff --git a/api/scripts/add_equipment_runs.sql b/api/scripts/add_equipment_runs.sql new file mode 100644 index 0000000..50c5430 --- /dev/null +++ b/api/scripts/add_equipment_runs.sql @@ -0,0 +1,61 @@ +-- Generic per-equipment runs table for manual uploads of ANY registered +-- equipment (not just the canonical etcher). Inputs/outputs are stored as +-- JSONB so no bespoke table is required per equipment type. +-- Safe to run multiple times. + +CREATE TABLE IF NOT EXISTS equipment_runs ( + id BIGSERIAL PRIMARY KEY, + equipment_id VARCHAR(255) NOT NULL, + project_id VARCHAR(50) REFERENCES projects(id) ON DELETE SET NULL, + lotname VARCHAR(255), + run_date TIMESTAMP WITH TIME ZONE, + is_outlier BOOLEAN DEFAULT false, + is_calibration_recipe BOOLEAN DEFAULT false, + outlier_type TEXT DEFAULT '', + inputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + outputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + raw_payload_json JSONB NOT NULL DEFAULT '{}'::jsonb, + upload_id UUID, + upload_filename VARCHAR(255) DEFAULT '', + source VARCHAR(100) DEFAULT 'data_upload', + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_equipment_runs_equipment_id ON equipment_runs(equipment_id); +CREATE INDEX IF NOT EXISTS idx_equipment_runs_project_id ON equipment_runs(project_id); +CREATE INDEX IF NOT EXISTS idx_equipment_runs_upload_id ON equipment_runs(upload_id); + +ALTER TABLE equipment_runs ENABLE ROW LEVEL SECURITY; + +GRANT SELECT ON equipment_runs TO api_client; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM pg_policies + WHERE schemaname = 'public' + AND tablename = 'equipment_runs' + AND policyname = 'equipment_run_visibility_policy' + ) THEN + CREATE POLICY equipment_run_visibility_policy ON equipment_runs + FOR SELECT + USING ( + EXISTS ( + SELECT 1 + FROM projects p + WHERE p.id = equipment_runs.project_id + AND ( + p.access_mode = 'open' + OR EXISTS ( + SELECT 1 + FROM project_members pm + WHERE pm.project_id = p.id + AND pm.nanohub_user_id = current_setting('app.current_user', true) + ) + ) + ) + ); + END IF; +END +$$; diff --git a/geddes/k8s/postgres/schema_v2.sql b/geddes/k8s/postgres/schema_v2.sql index 56c95d8..621a21a 100644 --- a/geddes/k8s/postgres/schema_v2.sql +++ b/geddes/k8s/postgres/schema_v2.sql @@ -90,6 +90,54 @@ CREATE POLICY run_visibility_policy ON etcher_runs ) ); +-- Generic per-equipment runs table. Unlike etcher_runs (which has typed +-- columns for the canonical etcher schema), this table stores the validated +-- inputs/outputs for ANY registered equipment as JSONB, so manual uploads for +-- newly registered equipment can be ingested without a bespoke table. +CREATE TABLE IF NOT EXISTS equipment_runs ( + id BIGSERIAL PRIMARY KEY, + equipment_id VARCHAR(255) NOT NULL, + project_id VARCHAR(50) REFERENCES projects(id) ON DELETE SET NULL, + lotname VARCHAR(255), + run_date TIMESTAMP WITH TIME ZONE, + is_outlier BOOLEAN DEFAULT false, + is_calibration_recipe BOOLEAN DEFAULT false, + outlier_type TEXT DEFAULT '', + inputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + outputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + raw_payload_json JSONB NOT NULL DEFAULT '{}'::jsonb, + upload_id UUID, + upload_filename VARCHAR(255) DEFAULT '', + source VARCHAR(100) DEFAULT 'data_upload', + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_equipment_runs_equipment_id ON equipment_runs(equipment_id); +CREATE INDEX IF NOT EXISTS idx_equipment_runs_project_id ON equipment_runs(project_id); +CREATE INDEX IF NOT EXISTS idx_equipment_runs_upload_id ON equipment_runs(upload_id); + +ALTER TABLE equipment_runs ENABLE ROW LEVEL SECURITY; +GRANT SELECT ON equipment_runs TO api_client; + +CREATE POLICY equipment_run_visibility_policy ON equipment_runs + FOR SELECT + USING ( + EXISTS ( + SELECT 1 + FROM projects p + WHERE p.id = equipment_runs.project_id + AND ( + p.access_mode = 'open' + OR EXISTS ( + SELECT 1 + FROM project_members pm + WHERE pm.project_id = p.id + AND pm.nanohub_user_id = current_setting('app.current_user', true) + ) + ) + ) + ); + CREATE TABLE IF NOT EXISTS users ( id VARCHAR(255) PRIMARY KEY, -- NanoHUB Subject ID or Email name VARCHAR(255) NOT NULL, diff --git a/web/app/data/upload/page.tsx b/web/app/data/upload/page.tsx index 571e582..c29325c 100644 --- a/web/app/data/upload/page.tsx +++ b/web/app/data/upload/page.tsx @@ -59,7 +59,7 @@ function mapPersistedUpload(upload: PersistedUpload): UploadedFile { errors: upload.errors.length > 0 ? upload.errors : undefined, message: upload.status === "processed" - ? "Processed into etcher_runs." + ? "Processed into the data catalog." : upload.status === "success" ? "Stored upload record. Choose a project and process it to add rows to the catalog." : "Stored upload record with structural validation issues.", @@ -398,7 +398,7 @@ export default function DataUploadPage() {

)} {file.status === "processing" && ( -

Processing rows into etcher_runs...

+

Processing rows into the data catalog...

)} {(file.status === "error" || file.status === "failed") && file.errors && (
@@ -409,6 +409,15 @@ export default function DataUploadPage() { ))}
)} + {(file.status === "success" || file.status === "processed") && file.errors && ( +
+ {file.errors.map((warning, j) => ( +

+ • {warning} +

+ ))} +
+ )} {file.id && file.status === "success" && ( @@ -450,13 +459,13 @@ export default function DataUploadPage() {

2. Validation

- This screen structurally validates CSV headers, row width, and strictly enforces physical boundaries (min/max values) defined during registration. + This screen structurally validates CSV headers and row width, and flags out-of-range values (min/max defined during registration) as warnings to review.

3. Processing

- CSV uploads can be processed into etcher_runs for the selected project. Excel files are stored for provenance but are not ingested automatically. + CSV uploads can be processed into the data catalog for the selected project. Excel files are stored for provenance but are not ingested automatically.

From 1e2cdcfad55812ee9c2692c9a4518a73ac2df379 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:05:28 -0500 Subject: [PATCH 02/23] Address Codex review: hint-row skip, identity, outputs, metrics - Skip the template value-type descriptor row during ingestion (both etcher and generic builders) so a filled-in template processes cleanly. - Attribute generic runs to their own equipment_id and resolve the display name via equipment_metadata instead of the project's equipment. - Surface generic measured outputs in the catalog run-detail page and render generic input set-points (strings or numbers); add outputs to V2Run. - Count generic equipment_runs in equipment inventory run metrics. Co-authored-by: Cursor --- api/data_loader_pg.py | 9 ++- api/metadata_pg.py | 41 ++++++++++--- api/routers/upload.py | 40 +++++++++++++ web/app/data/catalog/[id]/page.tsx | 93 +++++++++++++++++++----------- web/lib/api-client.ts | 1 + 5 files changed, 138 insertions(+), 46 deletions(-) diff --git a/api/data_loader_pg.py b/api/data_loader_pg.py index 551a265..a7718d2 100644 --- a/api/data_loader_pg.py +++ b/api/data_loader_pg.py @@ -371,19 +371,19 @@ def _equipment_runs_base_select(*, include_raw_payload: bool = False) -> str: r.outlier_type, r.inputs_json, r.outputs_json, - r.equipment_id AS run_equipment_id, r.upload_filename, r.source, r.project_id, ''::text AS execution_request_id, p.name AS project_name, - COALESCE(p.equipment_id, r.equipment_id) AS equipment_id, - p.equipment_name AS equipment_name, + r.equipment_id AS equipment_id, + COALESCE(em.equipment_name, p.equipment_name, r.equipment_id) AS equipment_name, p.pi_name AS pi_name, p.access_mode AS project_access {raw_payload_select} FROM equipment_runs r LEFT JOIN projects p ON p.id = r.project_id + LEFT JOIN equipment_metadata em ON em.domain_id = r.equipment_id """ @@ -407,8 +407,7 @@ def _shape_equipment_run_record(row) -> Dict[str, Any]: rec.setdefault("avg_etch_rate", None) rec.setdefault("range_etch_rate", None) rec.setdefault("range_nm", None) - rec["equipment_name"] = rec.get("equipment_name") or rec.get("run_equipment_id") or "" - rec.pop("run_equipment_id", None) + rec["equipment_name"] = rec.get("equipment_name") or rec.get("equipment_id") or "" raw_payload = rec.pop("raw_payload_json", None) if raw_payload is not None: rec["raw_payload"] = raw_payload diff --git a/api/metadata_pg.py b/api/metadata_pg.py index 871f077..2a06abc 100644 --- a/api/metadata_pg.py +++ b/api/metadata_pg.py @@ -1079,18 +1079,43 @@ def _serialize_equipment( def _fetch_equipment_run_metrics(conn) -> dict[str, dict[str, Any]]: + """Aggregate run counts and latest-data timestamps per equipment. + + Combines the canonical etcher_runs (attributed via the project's equipment) + with the generic equipment_runs table (attributed via the run's own + equipment_id) so manually uploaded data for any registered equipment is + reflected in the inventory metrics. + """ with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute( """ SELECT - p.equipment_id, - COUNT(r.idruns)::int AS total_runs, - MAX(COALESCE(r.run_date, r.created_at)) AS last_data_at - FROM projects p - JOIN etcher_runs r ON r.project_id = p.id - WHERE p.equipment_id IS NOT NULL - AND p.equipment_id <> '' - GROUP BY p.equipment_id + equipment_id, + SUM(total_runs)::int AS total_runs, + MAX(last_data_at) AS last_data_at + FROM ( + SELECT + p.equipment_id AS equipment_id, + COUNT(r.idruns) AS total_runs, + MAX(COALESCE(r.run_date, r.created_at)) AS last_data_at + FROM projects p + JOIN etcher_runs r ON r.project_id = p.id + WHERE p.equipment_id IS NOT NULL + AND p.equipment_id <> '' + GROUP BY p.equipment_id + + UNION ALL + + SELECT + er.equipment_id AS equipment_id, + COUNT(er.id) AS total_runs, + MAX(COALESCE(er.run_date, er.created_at)) AS last_data_at + FROM equipment_runs er + WHERE er.equipment_id IS NOT NULL + AND er.equipment_id <> '' + GROUP BY er.equipment_id + ) combined + GROUP BY equipment_id """ ) rows = cur.fetchall() diff --git a/api/routers/upload.py b/api/routers/upload.py index 225bc66..1fdfb14 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -305,6 +305,8 @@ def _csv_upload_to_sync_rows( if not path.exists(): return [], ["Stored upload file is missing from disk."] + hint_by_column = _template_hint_by_column(_load_equipment_config(upload_record["equipment_id"])) + errors: list[str] = [] rows: list[dict[str, Any]] = [] with path.open("r", encoding="utf-8-sig", newline="") as f: @@ -317,6 +319,9 @@ def _csv_upload_to_sync_rows( return [], errors for idx, raw_row in enumerate(reader, start=2): + # Skip the template's value-type descriptor row if the user left it in. + if _is_template_hint_row(raw_row, hint_by_column): + continue row_errors: list[str] = [] row_number = idx - 1 id_value = _upload_value(raw_row, "idruns") @@ -525,6 +530,37 @@ def add_outputs() -> None: return columns, value_types +def _template_hint_by_column(config: dict[str, Any]) -> dict[str, str]: + """Map each template column to its value-type/unit hint (the second row of + the generated CSV template).""" + try: + columns, value_types = _template_columns_and_value_types(config) + except Exception: + return {} + return {col: str(vt) for col, vt in zip(columns, value_types)} + + +def _is_template_hint_row( + raw_row: dict[str, Any], + hint_by_column: dict[str, str], +) -> bool: + """Detect the descriptor row a user may leave in place when filling out the + downloaded template (e.g. ``string,datetime,float,sccm``). Returns True only + when every populated known column exactly equals its template hint, so real + measurement rows (which carry numbers) are never mistaken for hints.""" + if not hint_by_column: + return False + matched = 0 + for column, hint in hint_by_column.items(): + text = str(raw_row.get(column) or "").strip() + if text == "": + continue + if text != str(hint).strip(): + return False + matched += 1 + return matched > 0 + + @router.post("/upload") async def upload_file( equipment_id: str = Form(...), @@ -853,6 +889,7 @@ def _csv_upload_to_equipment_rows( input_names, output_names = _config_input_output_names(config) required_columns = derive_expected_columns(config) + hint_by_column = _template_hint_by_column(config) errors: list[str] = [] rows: list[dict[str, Any]] = [] @@ -867,6 +904,9 @@ def _csv_upload_to_equipment_rows( return [], errors for raw_row in reader: + # Skip the template's value-type descriptor row if the user left it in. + if _is_template_hint_row(raw_row, hint_by_column): + continue inputs = { name: _coerce_cell(raw_row.get(name)) for name in input_names diff --git a/web/app/data/catalog/[id]/page.tsx b/web/app/data/catalog/[id]/page.tsx index 273b6a1..734ad60 100644 --- a/web/app/data/catalog/[id]/page.tsx +++ b/web/app/data/catalog/[id]/page.tsx @@ -116,6 +116,12 @@ export default function RunDetailPage({ params }: { params: Promise<{ id: string const access = ACCESS_STYLE[accessKey] || ACCESS_STYLE.private; const AccessIcon = access.icon; + // Generic (non-etcher) equipment report their measured results in `outputs` + // rather than the canonical etch-rate columns. + const genericOutputs = + run.outputs && Object.keys(run.outputs).length > 0 ? run.outputs : null; + const isEtcher = run.equipment_id === "etcher" || run.avg_etch_rate !== null; + return (
Measured outcomes -
-
-
- -

Avg etch rate

-
-

- {formatNumber(run.avg_etch_rate, 1)} - nm/min -

+ {genericOutputs ? ( +
+ {Object.entries(genericOutputs).map(([key, value]) => ( +
+
+ +

{key}

+
+

+ {typeof value === "number" ? formatNumber(value, 2) : (value ?? "—")} +

+
+ ))}
-
-
- -

Range etch rate

+ ) : isEtcher ? ( +
+
+
+ +

Avg etch rate

+
+

+ {formatNumber(run.avg_etch_rate, 1)} + nm/min +

-

- ±{formatNumber(run.range_etch_rate, 1)} - nm/min -

-
-
-
- -

Range (nm)

+
+
+ +

Range etch rate

+
+

+ ±{formatNumber(run.range_etch_rate, 1)} + nm/min +

+
+
+
+ +

Range (nm)

+
+

+ {formatNumber(run.range_nm, 1)} + nm +

-

- {formatNumber(run.range_nm, 1)} - nm -

-
+ ) : ( +

+ No measured outputs were reported for this run. +

+ )}
{/* Process inputs */}

- Process inputs (etch-step averages) + {isEtcher ? "Process inputs (etch-step averages)" : "Process inputs"}

- Per-step averages computed from the Glance time series during the etch - step of this lot. + {isEtcher + ? "Per-step averages computed from the Glance time series during the etch step of this lot." + : "Recorded input set-points for this run."}

@@ -278,13 +305,13 @@ export default function RunDetailPage({ params }: { params: Promise<{ id: string - {Object.entries(run.features).map(([key, value]) => { + {Object.entries(run.features as Record).map(([key, value]) => { const meta = FEATURE_DISPLAY[key] || { label: key, unit: "", description: "" }; return ( - + ); diff --git a/web/lib/api-client.ts b/web/lib/api-client.ts index 5c4ab99..e17fedb 100644 --- a/web/lib/api-client.ts +++ b/web/lib/api-client.ts @@ -161,6 +161,7 @@ export interface V2Run { range_etch_rate: number | null; range_nm: number | null; features: Record; + outputs?: Record; project_id: string | null; project_name: string | null; equipment_id: string | null; From 4b091241436ae8e4178550186702201902974b75 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:10:31 -0500 Subject: [PATCH 03/23] Address Codex review round 2: templates, project match, FAIR metadata - Explicit-column templates now also include lot/timestamp keys and registered input parameters so no run metadata is silently dropped. - process_upload rejects linking a run to a project whose equipment differs from the upload's equipment (empty project equipment stays unconstrained). - run_to_jsonld serializes generic measured outputs and labels the dataset by its actual equipment instead of always "Etcher". Co-authored-by: Cursor --- api/fair_archiver.py | 64 +++++++++++++++++++++++++++++-------------- api/routers/upload.py | 35 +++++++++++++++++++---- 2 files changed, 73 insertions(+), 26 deletions(-) diff --git a/api/fair_archiver.py b/api/fair_archiver.py index 5e12b77..b563f4e 100644 --- a/api/fair_archiver.py +++ b/api/fair_archiver.py @@ -764,6 +764,9 @@ def run_to_jsonld(run: dict[str, Any]) -> dict[str, Any]: project_name = run.get("project_name", "Unknown Project") equipment_name = run.get("equipment_name", "Unknown Equipment") equipment_id = run.get("equipment_id", "unknown") + # Label the dataset by its actual equipment so non-etcher runs aren't + # misrepresented as etcher runs in FAIR metadata. + run_label = "Etcher" if equipment_id == "etcher" else (equipment_name or "Equipment") # Load domain config for ontology URIs domain_config = _load_domain_config(equipment_id) @@ -796,26 +799,45 @@ def run_to_jsonld(run: dict[str, Any]) -> dict[str, Any]: prop["prov:qualifiedGeneration"] = "prov:generated" variables_measured.append(prop) - # Build ontology-enriched target list + # Build ontology-enriched result list. Generic equipment report their + # measurements under ``outputs``; the canonical etcher uses typed columns. + generic_outputs = run.get("outputs") or {} results = [] - for target_id, target_label, target_key in [ - ("AvgEtchRate", "Etch Rate", "avg_etch_rate"), - ("RangeEtchRate", "Thickness Range", "range_etch_rate"), - ]: - onto = ontology_map.get(target_id, {}) - result_prop: dict[str, Any] = { - "@type": "PropertyValue", - "name": target_id, - "value": run.get(target_key), - } - if onto.get("unit"): - result_prop["unitText"] = onto["unit"] - if onto.get("qudt_quantity_kind"): - result_prop["qudt:hasQuantityKind"] = {"@id": onto["qudt_quantity_kind"]} - if onto.get("qudt_unit"): - result_prop["qudt:hasUnit"] = {"@id": onto["qudt_unit"]} - result_prop["prov:qualifiedGeneration"] = "prov:generated" - results.append(result_prop) + if isinstance(generic_outputs, dict) and generic_outputs: + for key, value in generic_outputs.items(): + onto = ontology_map.get(key, {}) + result_prop = { + "@type": "PropertyValue", + "name": key, + "value": value, + } + if onto.get("unit"): + result_prop["unitText"] = onto["unit"] + if onto.get("qudt_quantity_kind"): + result_prop["qudt:hasQuantityKind"] = {"@id": onto["qudt_quantity_kind"]} + if onto.get("qudt_unit"): + result_prop["qudt:hasUnit"] = {"@id": onto["qudt_unit"]} + result_prop["prov:qualifiedGeneration"] = "prov:generated" + results.append(result_prop) + else: + for target_id, target_label, target_key in [ + ("AvgEtchRate", "Etch Rate", "avg_etch_rate"), + ("RangeEtchRate", "Thickness Range", "range_etch_rate"), + ]: + onto = ontology_map.get(target_id, {}) + result_prop: dict[str, Any] = { + "@type": "PropertyValue", + "name": target_id, + "value": run.get(target_key), + } + if onto.get("unit"): + result_prop["unitText"] = onto["unit"] + if onto.get("qudt_quantity_kind"): + result_prop["qudt:hasQuantityKind"] = {"@id": onto["qudt_quantity_kind"]} + if onto.get("qudt_unit"): + result_prop["qudt:hasUnit"] = {"@id": onto["qudt_unit"]} + result_prop["prov:qualifiedGeneration"] = "prov:generated" + results.append(result_prop) # Build equipment entity with SAREF metadata equipment_entity: dict[str, Any] = { @@ -864,11 +886,11 @@ def run_to_jsonld(run: dict[str, Any]) -> dict[str, Any]: }, ], "@type": "Dataset", - "name": f"Etcher Run {run.get('run_id', 'unknown')}", + "name": f"{run_label} Run {run.get('run_id', 'unknown')}", "identifier": str(run.get("run_id", "")), "dateCreated": run.get("run_date") or run.get("created_at") or "", "description": ( - f"Etcher run {run.get('run_id')} from lot {run.get('lot_name', 'N/A')}. " + f"{run_label} run {run.get('run_id')} from lot {run.get('lot_name', 'N/A')}. " f"Equipment: {equipment_name}. Project: {project_name}." ), "license": {"@id": DEFAULT_LICENSE}, diff --git a/api/routers/upload.py b/api/routers/upload.py index 1fdfb14..6643ed9 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -284,13 +284,20 @@ def _load_upload_record(upload_id: str, user: PlatformUser) -> dict[str, Any]: return record -def _assert_project_visible(project_id: str, user: PlatformUser) -> None: +def _assert_project_visible(project_id: str, user: PlatformUser) -> dict[str, Any]: + """Return the visible project record, or raise 404 if the caller can't see + it (private project membership is never disclosed).""" visible_projects = get_projects_list_pg( nanohub_user_id=user.id, is_admin=user.role == "admin", ) - if not any(project.get("id") == project_id for project in visible_projects): + project = next( + (p for p in visible_projects if p.get("id") == project_id), + None, + ) + if project is None: raise HTTPException(status_code=404, detail="Selected project was not found") + return project def _csv_upload_to_sync_rows( @@ -502,11 +509,15 @@ def add_outputs() -> None: add(output, role="output") # When the equipment declares an explicit ingestion column list we honor it - # as the canonical input schema, but still append any registered outputs - # that aren't already present so results have somewhere to go. + # as the canonical input schema, but still ensure the lot/timestamp keys, + # registered input parameters, and outputs are present so no run metadata is + # silently dropped for configs that listed only a subset of columns. if explicit_columns: for column in explicit_columns: add(column["name"]) + add(lot_col, role="lot") + add(ts_col, role="timestamp") + add_parameters() add_outputs() return columns, value_types @@ -950,10 +961,24 @@ def process_upload( project_id = payload.project_id.strip() if not project_id: raise HTTPException(status_code=400, detail="project_id is required") - _assert_project_visible(project_id, user) + project = _assert_project_visible(project_id, user) upload_record = _load_upload_record(upload_id, user) equipment_id = upload_record["equipment_id"] + + # Guard against linking a run to a project that belongs to a different + # tool. A project with no equipment association is left unconstrained. + project_equipment_id = str(project.get("equipment_id") or "").strip() + if project_equipment_id and project_equipment_id != equipment_id: + raise HTTPException( + status_code=400, + detail=( + f"Selected project is associated with equipment " + f"'{project_equipment_id}', not '{equipment_id}'. Choose a " + "matching project." + ), + ) + _update_upload_record(upload_id=upload_id, status="processing", errors=[]) def _fail(errors: list[str], message: str, row_count: int) -> dict[str, Any]: From 9f072900567e6c8da5b6d17f9807d3c0492c5810 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:15:50 -0500 Subject: [PATCH 04/23] Address Codex review round 3: non-finite cells, fleet-wide metrics - _coerce_optional_float rejects NaN/Infinity so generic ingestion stores the raw token as text instead of emitting invalid JSONB and 500-ing. - _fetch_equipment_run_metrics aggregates on the superuser connection so inventory run counts/last-data are complete fleet-wide rather than being hidden by RLS for private/shared projects. Co-authored-by: Cursor --- api/metadata_pg.py | 74 ++++++++++++++++++++++++------------------- api/routers/upload.py | 8 ++++- 2 files changed, 49 insertions(+), 33 deletions(-) diff --git a/api/metadata_pg.py b/api/metadata_pg.py index 2a06abc..a675da4 100644 --- a/api/metadata_pg.py +++ b/api/metadata_pg.py @@ -1078,47 +1078,57 @@ def _serialize_equipment( } -def _fetch_equipment_run_metrics(conn) -> dict[str, dict[str, Any]]: - """Aggregate run counts and latest-data timestamps per equipment. +def _fetch_equipment_run_metrics(_conn=None) -> dict[str, dict[str, Any]]: + """Aggregate fleet-wide run counts and latest-data timestamps per equipment. Combines the canonical etcher_runs (attributed via the project's equipment) with the generic equipment_runs table (attributed via the run's own equipment_id) so manually uploaded data for any registered equipment is reflected in the inventory metrics. + + Runs on the superuser connection so these non-sensitive aggregate counts are + complete (fleet-wide) regardless of the caller's project membership. The + RLS-scoped connection passed by callers would otherwise hide runs in + private/shared projects, even from members and admins, and undercount. """ - with conn.cursor(cursor_factory=RealDictCursor) as cur: - cur.execute( - """ - SELECT - equipment_id, - SUM(total_runs)::int AS total_runs, - MAX(last_data_at) AS last_data_at - FROM ( + conn = get_pg_superuser_connection() + try: + with conn.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute( + """ SELECT - p.equipment_id AS equipment_id, - COUNT(r.idruns) AS total_runs, - MAX(COALESCE(r.run_date, r.created_at)) AS last_data_at - FROM projects p - JOIN etcher_runs r ON r.project_id = p.id - WHERE p.equipment_id IS NOT NULL - AND p.equipment_id <> '' - GROUP BY p.equipment_id + equipment_id, + SUM(total_runs)::int AS total_runs, + MAX(last_data_at) AS last_data_at + FROM ( + SELECT + p.equipment_id AS equipment_id, + COUNT(r.idruns) AS total_runs, + MAX(COALESCE(r.run_date, r.created_at)) AS last_data_at + FROM projects p + JOIN etcher_runs r ON r.project_id = p.id + WHERE p.equipment_id IS NOT NULL + AND p.equipment_id <> '' + GROUP BY p.equipment_id - UNION ALL + UNION ALL - SELECT - er.equipment_id AS equipment_id, - COUNT(er.id) AS total_runs, - MAX(COALESCE(er.run_date, er.created_at)) AS last_data_at - FROM equipment_runs er - WHERE er.equipment_id IS NOT NULL - AND er.equipment_id <> '' - GROUP BY er.equipment_id - ) combined - GROUP BY equipment_id - """ - ) - rows = cur.fetchall() + SELECT + er.equipment_id AS equipment_id, + COUNT(er.id) AS total_runs, + MAX(COALESCE(er.run_date, er.created_at)) AS last_data_at + FROM equipment_runs er + WHERE er.equipment_id IS NOT NULL + AND er.equipment_id <> '' + GROUP BY er.equipment_id + ) combined + GROUP BY equipment_id + """ + ) + rows = cur.fetchall() + conn.commit() + finally: + conn.close() return {row["equipment_id"]: dict(row) for row in rows} diff --git a/api/routers/upload.py b/api/routers/upload.py index 6643ed9..f455ad8 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -5,6 +5,7 @@ import io import hashlib import logging +import math import os from datetime import datetime from pathlib import Path @@ -110,9 +111,14 @@ def _coerce_optional_float(value: Any) -> float | None: if text == "" or text.lower() == "none": return None try: - return float(text) + parsed = float(text) except (TypeError, ValueError): return None + # Reject NaN/Infinity: they parse fine in Python but are invalid JSON + # tokens for PostgreSQL JSONB and would break ingestion. + if not math.isfinite(parsed): + return None + return parsed def _extract_expected_bounds( From 91b509258791a6a0dda7a1efb5b840b49f6aec08 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:20:08 -0500 Subject: [PATCH 05/23] Address Codex review round 4: export scope, timestamp validation - Scope the catalog Download button explicitly to the etcher ML export (label + tooltip + filename) since that endpoint is the model-specific training matrix; generic runs remain browsable in the catalog. - Validate generic run timestamps and required lot/timestamp cells during processing so malformed dates surface as actionable validation errors instead of a database-sync 500; store normalized ISO timestamps. Co-authored-by: Cursor --- api/routers/upload.py | 63 +++++++++++++++++++++++++++++++++-- web/app/data/catalog/page.tsx | 5 +-- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/api/routers/upload.py b/api/routers/upload.py index f455ad8..a5862ef 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -880,6 +880,43 @@ def _coerce_cell(value: Any) -> Any: return parsed if parsed is not None else text +_TIMESTAMP_FORMATS = ( + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%d", + "%Y/%m/%d %H:%M:%S", + "%Y/%m/%d", + "%m/%d/%Y %H:%M:%S", + "%m/%d/%Y %H:%M", + "%m/%d/%Y", + "%d-%m-%Y", +) + + +def _normalize_timestamp(text: str) -> tuple[str | None, bool]: + """Parse a timestamp cell into a normalized ISO string. + + Returns ``(iso_string_or_None, is_valid)``. ``is_valid`` is False only when + a non-empty value cannot be parsed, so callers can raise an actionable + validation error instead of letting an invalid literal reach the + ``TIMESTAMP WITH TIME ZONE`` column and fail the whole sync with a 500. + """ + raw = (text or "").strip() + if not raw: + return None, True + candidate = raw.replace("Z", "+00:00") if raw.endswith("Z") else raw + try: + return datetime.fromisoformat(candidate).isoformat(), True + except ValueError: + pass + for fmt in _TIMESTAMP_FORMATS: + try: + return datetime.strptime(raw, fmt).isoformat(), True + except ValueError: + continue + return None, False + + def _csv_upload_to_equipment_rows( *, config: dict[str, Any], @@ -907,6 +944,8 @@ def _csv_upload_to_equipment_rows( input_names, output_names = _config_input_output_names(config) required_columns = derive_expected_columns(config) hint_by_column = _template_hint_by_column(config) + lot_required = lot_col in required_columns + ts_required = ts_col in required_columns errors: list[str] = [] rows: list[dict[str, Any]] = [] @@ -920,10 +959,28 @@ def _csv_upload_to_equipment_rows( ) return [], errors - for raw_row in reader: + for idx, raw_row in enumerate(reader, start=2): # Skip the template's value-type descriptor row if the user left it in. if _is_template_hint_row(raw_row, hint_by_column): continue + + lotname = str(raw_row.get(lot_col) or "").strip() + if lot_required and not lotname: + errors.append(f"Row {idx}: {lot_col} is required") + + ts_raw = str(raw_row.get(ts_col) or "").strip() + run_date, ts_ok = _normalize_timestamp(ts_raw) + if not ts_ok: + errors.append( + f"Row {idx}: {ts_col} '{ts_raw}' is not a recognized date/time" + ) + elif ts_required and not ts_raw: + errors.append(f"Row {idx}: {ts_col} is required") + + if len(errors) > 50: + errors.append("... (more errors truncated)") + break + inputs = { name: _coerce_cell(raw_row.get(name)) for name in input_names @@ -937,8 +994,8 @@ def _csv_upload_to_equipment_rows( rows.append( { "project_id": project_id, - "lotname": str(raw_row.get(lot_col) or "").strip(), - "run_date": str(raw_row.get(ts_col) or "").strip() or None, + "lotname": lotname, + "run_date": run_date, "inputs": inputs, "outputs": outputs, "raw": dict(raw_row), diff --git a/web/app/data/catalog/page.tsx b/web/app/data/catalog/page.tsx index a471839..5e077c5 100644 --- a/web/app/data/catalog/page.tsx +++ b/web/app/data/catalog/page.tsx @@ -62,7 +62,7 @@ export default function DataCatalogPage() { const url = URL.createObjectURL(blob); const a = document.createElement("a"); a.href = url; - a.download = "digital_twin_runs.csv"; + a.download = "etcher_ml_export.csv"; document.body.appendChild(a); a.click(); a.remove(); @@ -104,6 +104,7 @@ export default function DataCatalogPage() { type="button" onClick={handleDownload} disabled={downloading} + title="Exports the etcher ML-ready dataset matrix. Runs uploaded for other equipment are browsable here but are not part of this etcher training export." className="inline-flex items-center justify-center gap-2 whitespace-nowrap text-sm font-medium transition-opacity focus-visible:outline-none focus-visible:ring-1 focus-visible:ring-[hsl(var(--ring))] h-9 px-4 py-2 bg-[hsl(var(--primary))] text-[hsl(var(--primary-foreground))] rounded-md shadow hover:opacity-90 disabled:opacity-60 disabled:cursor-not-allowed" > {downloading ? ( @@ -111,7 +112,7 @@ export default function DataCatalogPage() { ) : ( )} - {downloading ? "Preparing CSV..." : "Download CSV"} + {downloading ? "Preparing CSV..." : "Download Etcher ML CSV"} {downloadError && (

From 5f3a6dca7b64f2e7310d640d2fc56d0a24e50f55 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:26:42 -0500 Subject: [PATCH 06/23] Address Codex review round 5: required schema, typed cells, FAIR config - Required upload columns are now the equipment's declared inputs (explicit columns, else features+parameters); outputs/targets and synthetic placeholder targets (primary_metric/secondary_metric) are never required, so valid CSVs for newly registered equipment are no longer rejected. - Generic ingestion coerces each cell by its registered type, preserving zero-padded string ids, integers, and booleans instead of floating all. - run_to_jsonld / _load_domain_config fall back to the equipment's PG config_json so FAIR metadata for PG-registered equipment carries units, ontology, hardware, and owner info. Co-authored-by: Cursor --- api/fair_archiver.py | 15 +++++ api/routers/upload.py | 150 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 152 insertions(+), 13 deletions(-) diff --git a/api/fair_archiver.py b/api/fair_archiver.py index b563f4e..87413bd 100644 --- a/api/fair_archiver.py +++ b/api/fair_archiver.py @@ -171,6 +171,21 @@ def _load_domain_config(equipment_id: str) -> dict[str, Any] | None: return json.loads(config_path.read_text()) except Exception as exc: logger.debug("Could not load domain config %s: %s", equipment_id, exc) + + # Fallback: dynamically registered equipment have no JSON file on disk; + # their config lives in PostgreSQL. Pull config_json so FAIR metadata still + # carries registered units, ontology, hardware, and owner info. + if equipment_id and equipment_id != "unknown": + try: + from metadata_pg import get_equipment_pg + + equipment = get_equipment_pg(equipment_id) + if equipment: + config = equipment.get("config_json") + if isinstance(config, dict) and config: + return config + except Exception as exc: + logger.debug("Could not load PG config for %s: %s", equipment_id, exc) return None diff --git a/api/routers/upload.py b/api/routers/upload.py index a5862ef..ade5983 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -505,8 +505,10 @@ def add_outputs() -> None: if isinstance(targets, dict): for target in (targets.get("primary"), targets.get("secondary")): if isinstance(target, dict): - add(target.get("id") or target.get("name"), role="target") - else: + target_id = target.get("id") or target.get("name") + if target_id and str(target_id) not in _PLACEHOLDER_TARGET_IDS: + add(target_id, role="target") + elif target: add(target, role="target") for output in config.get("outputs", []): if isinstance(output, dict): @@ -617,7 +619,9 @@ async def upload_file( if isinstance(column, dict) and column.get("name") ] if not expected_columns: - expected_columns = derive_expected_columns( + # Required schema for registered equipment is its declared inputs; + # outputs/targets (incl. synthetic placeholders) stay optional. + expected_columns = _required_upload_columns( equipment.get("config_json", {}) ) else: @@ -805,12 +809,33 @@ def _load_equipment_config(equipment_id: str) -> dict[str, Any]: return legacy if isinstance(legacy, dict) else {} +# Default target ids build_domain_config assigns when an equipment is +# registered without explicit optimization targets. They are placeholders, not +# real measured columns, so they must not be treated as required/expected data. +_PLACEHOLDER_TARGET_IDS = {"primary_metric", "secondary_metric"} + + +def _real_target_ids(config: dict[str, Any]) -> list[str]: + """Return optimization-target ids that correspond to actual measured + columns (excluding the synthetic placeholders).""" + ids: list[str] = [] + targets = config.get("targets", {}) + if isinstance(targets, dict): + for target in (targets.get("primary"), targets.get("secondary")): + if isinstance(target, dict): + name = target.get("id") or target.get("name") + if name and str(name) not in _PLACEHOLDER_TARGET_IDS: + ids.append(str(name)) + return ids + + def _config_input_output_names( config: dict[str, Any], ) -> tuple[list[str], list[str]]: """Split an equipment config's columns into input set-points and measured - outputs. Outputs are authoritative (targets + registered outputs); every - other declared column that isn't the lot/timestamp key is treated as input. + outputs. Outputs are authoritative (real targets + registered outputs); + every other declared column that isn't the lot/timestamp key is treated as + input. """ data_config = config.get("data", {}) if not isinstance(data_config, dict): @@ -829,11 +854,8 @@ def add_out(name: Any) -> None: seen_out.add(text) output_names.append(text) - targets = config.get("targets", {}) - if isinstance(targets, dict): - for target in (targets.get("primary"), targets.get("secondary")): - if isinstance(target, dict): - add_out(target.get("id") or target.get("name")) + for target_id in _real_target_ids(config): + add_out(target_id) for output in config.get("outputs", []): if isinstance(output, dict): add_out(output.get("name") or output.get("id")) @@ -869,6 +891,107 @@ def add_in(name: Any) -> None: return input_names, output_names +def _required_upload_columns(config: dict[str, Any]) -> list[str]: + """Columns a generic upload must contain to be ingestible. + + Built from the equipment's declared inputs (explicit columns, else + features + parameters). Outputs/targets are intentionally optional — results + may be reported later — and synthetic placeholder targets are never + required. + """ + explicit = [ + column["name"] + for column in config.get("columns", []) + if isinstance(column, dict) and column.get("name") + ] + if explicit: + return explicit + + names: list[str] = [] + seen: set[str] = set() + + def add(name: Any) -> None: + if not name: + return + text = str(name) + if text not in seen: + seen.add(text) + names.append(text) + + for feature in config.get("features", []): + if isinstance(feature, dict): + add(feature.get("id") or feature.get("name")) + else: + add(feature) + for parameter in config.get("parameters", []): + if isinstance(parameter, dict): + add(parameter.get("name") or parameter.get("id")) + else: + add(parameter) + return names + + +def _config_type_by_name(config: dict[str, Any]) -> dict[str, str]: + """Map each declared column to its registered value type so ingestion can + preserve identifiers/integers/booleans instead of floating everything.""" + types: dict[str, str] = {} + + def register(name: Any, declared: Any) -> None: + if not name: + return + types.setdefault(str(name), str(declared or "").strip()) + + for column in config.get("columns", []): + if isinstance(column, dict): + register(column.get("name"), column.get("type")) + for parameter in config.get("parameters", []): + if isinstance(parameter, dict): + register(parameter.get("name") or parameter.get("id"), parameter.get("type")) + for output in config.get("outputs", []): + if isinstance(output, dict): + register(output.get("name") or output.get("id"), output.get("type")) + # Features are numeric ranges; targets are numeric metrics. + for feature in config.get("features", []): + if isinstance(feature, dict): + register(feature.get("id") or feature.get("name"), "float") + for target_id in _real_target_ids(config): + types.setdefault(target_id, "float") + return types + + +def _coerce_typed(value: Any, declared_type: str) -> Any: + """Coerce a CSV cell according to its registered type. Falls back to + numeric-where-possible only when the type is unknown, so declared strings + (e.g. zero-padded ids) and integers are preserved faithfully.""" + if value is None: + return None + text = str(value).strip() + if text == "": + return None + + t = (declared_type or "").strip().lower() + if t in {"string", "str", "text", "categorical", "category", "id", "identifier"}: + return text + if t in {"bool", "boolean"}: + low = text.lower() + if low in {"true", "1", "yes", "y", "t"}: + return True + if low in {"false", "0", "no", "n", "f"}: + return False + return text + if t in {"int", "integer", "long"}: + parsed = _coerce_optional_float(text) + if parsed is None: + return text + return int(parsed) if parsed.is_integer() else parsed + if t in {"float", "double", "number", "numeric", "real", "decimal"}: + parsed = _coerce_optional_float(text) + return parsed if parsed is not None else text + + # Unknown/unit-based type: keep the legacy numeric-where-possible behavior. + return _coerce_cell(value) + + def _coerce_cell(value: Any) -> Any: """Numeric where possible, otherwise the trimmed string (or None).""" if value is None: @@ -942,7 +1065,8 @@ def _csv_upload_to_equipment_rows( ts_col = data_config.get("timestamp_column") or "run_date" input_names, output_names = _config_input_output_names(config) - required_columns = derive_expected_columns(config) + required_columns = _required_upload_columns(config) + type_by_name = _config_type_by_name(config) hint_by_column = _template_hint_by_column(config) lot_required = lot_col in required_columns ts_required = ts_col in required_columns @@ -982,12 +1106,12 @@ def _csv_upload_to_equipment_rows( break inputs = { - name: _coerce_cell(raw_row.get(name)) + name: _coerce_typed(raw_row.get(name), type_by_name.get(name, "")) for name in input_names if name in raw_row } outputs = { - name: _coerce_cell(raw_row.get(name)) + name: _coerce_typed(raw_row.get(name), type_by_name.get(name, "")) for name in output_names if name in raw_row } From 046cb7b47f57b388d3c693e2f0bdd5cb4f38e8ad Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:32:45 -0500 Subject: [PATCH 07/23] Address Codex review round 6: stable ids, bool render, ontology, config - equipment_runs gains a stable (upload_id, row_index) identity; reprocessing an upload now upserts in place (preserving each run's id/run_id and catalog URL) and prunes only rows dropped by a shorter re-upload. - _load_domain_config returns the file config only when present so the PG config_json fallback actually runs for database-only equipment. - _get_feature_ontology_map now indexes registered outputs so their units and QUDT metadata appear in FAIR JSON-LD. - Run-detail page renders boolean/string generic measurements explicitly instead of rendering nothing. Co-authored-by: Cursor --- api/data_loader_pg.py | 50 ++++++++++++++++++++++++------ api/fair_archiver.py | 18 ++++++++++- api/scripts/add_equipment_runs.sql | 7 +++++ geddes/k8s/postgres/schema_v2.sql | 6 ++++ web/app/data/catalog/[id]/page.tsx | 8 +++-- 5 files changed, 76 insertions(+), 13 deletions(-) diff --git a/api/data_loader_pg.py b/api/data_loader_pg.py index a7718d2..4730e6e 100644 --- a/api/data_loader_pg.py +++ b/api/data_loader_pg.py @@ -300,12 +300,16 @@ def ensure_equipment_runs_pg() -> None: outputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, raw_payload_json JSONB NOT NULL DEFAULT '{}'::jsonb, upload_id UUID, + row_index INT, upload_filename VARCHAR(255) DEFAULT '', source VARCHAR(100) DEFAULT 'data_upload', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ) """ ) + cur.execute( + "ALTER TABLE equipment_runs ADD COLUMN IF NOT EXISTS row_index INT" + ) cur.execute( "CREATE INDEX IF NOT EXISTS idx_equipment_runs_equipment_id ON equipment_runs(equipment_id)" ) @@ -315,6 +319,13 @@ def ensure_equipment_runs_pg() -> None: cur.execute( "CREATE INDEX IF NOT EXISTS idx_equipment_runs_upload_id ON equipment_runs(upload_id)" ) + cur.execute( + """ + CREATE UNIQUE INDEX IF NOT EXISTS uq_equipment_runs_upload_row + ON equipment_runs(upload_id, row_index) + WHERE upload_id IS NOT NULL + """ + ) cur.execute("ALTER TABLE equipment_runs ENABLE ROW LEVEL SECURITY") cur.execute("GRANT SELECT ON equipment_runs TO api_client") cur.execute( @@ -429,9 +440,11 @@ def sync_equipment_runs_pg( inputs (dict), outputs (dict), raw (dict), and optional is_outlier / is_calibration_recipe / outlier_type flags. - When ``upload_id`` is provided, any previously ingested rows for that upload - are deleted first so re-processing the same upload is idempotent (it - replaces rather than duplicates). + When ``upload_id`` is provided, rows are upserted on their stable + ``(upload_id, row_index)`` identity so re-processing the same upload is + idempotent and preserves each run's id (and therefore its exposed run_id / + catalog URL). Any rows left over from a previous, longer version of the + upload are pruned afterwards. """ from psycopg2.extras import execute_values @@ -441,7 +454,7 @@ def sync_equipment_runs_pg( conn = get_pg_superuser_connection() try: insert_rows = [] - for row in rows: + for index, row in enumerate(rows): insert_rows.append(( equipment_id, row.get("project_id"), @@ -454,16 +467,12 @@ def sync_equipment_runs_pg( Json(_json_safe_payload(row.get("outputs", {}) or {})), Json(_json_safe_payload(row.get("raw", {}) or {})), upload_id, + index, upload_filename, str(row.get("source", "data_upload") or "data_upload"), )) with conn.cursor() as cur: - if upload_id is not None: - cur.execute( - "DELETE FROM equipment_runs WHERE upload_id = %s", - (upload_id,), - ) execute_values( cur, """ @@ -471,11 +480,32 @@ def sync_equipment_runs_pg( equipment_id, project_id, lotname, run_date, is_outlier, is_calibration_recipe, outlier_type, inputs_json, outputs_json, raw_payload_json, - upload_id, upload_filename, source + upload_id, row_index, upload_filename, source ) VALUES %s + ON CONFLICT (upload_id, row_index) WHERE upload_id IS NOT NULL + DO UPDATE SET + equipment_id = EXCLUDED.equipment_id, + project_id = EXCLUDED.project_id, + lotname = EXCLUDED.lotname, + run_date = EXCLUDED.run_date, + is_outlier = EXCLUDED.is_outlier, + is_calibration_recipe = EXCLUDED.is_calibration_recipe, + outlier_type = EXCLUDED.outlier_type, + inputs_json = EXCLUDED.inputs_json, + outputs_json = EXCLUDED.outputs_json, + raw_payload_json = EXCLUDED.raw_payload_json, + upload_filename = EXCLUDED.upload_filename, + source = EXCLUDED.source """, insert_rows, ) + # Prune rows from a previous, longer version of this upload so a + # shrunk re-upload doesn't leave stale runs behind. + if upload_id is not None: + cur.execute( + "DELETE FROM equipment_runs WHERE upload_id = %s AND row_index >= %s", + (upload_id, len(insert_rows)), + ) conn.commit() return len(insert_rows) finally: diff --git a/api/fair_archiver.py b/api/fair_archiver.py index 87413bd..12c3b9a 100644 --- a/api/fair_archiver.py +++ b/api/fair_archiver.py @@ -159,7 +159,9 @@ def _load_domain_config(equipment_id: str) -> dict[str, Any] | None: """ try: from domain_configs import load_domain_config - return load_domain_config(equipment_id) + config = load_domain_config(equipment_id) + if config: + return config except Exception: pass @@ -227,6 +229,20 @@ def _get_feature_ontology_map(domain_config: dict[str, Any] | None) -> dict[str, entry["qudt_unit"] = target["qudt_unit"] mapping[tid] = entry + # Registered outputs (measured results captured at registration; always output) + for output in domain_config.get("outputs", []): + if not isinstance(output, dict): + continue + oid = output.get("name") or output.get("id") + if not oid: + continue + entry = {"unit": output.get("unit", ""), "prov_direction": "output"} + if output.get("qudt_quantity_kind"): + entry["qudt_quantity_kind"] = output["qudt_quantity_kind"] + if output.get("qudt_unit"): + entry["qudt_unit"] = output["qudt_unit"] + mapping.setdefault(str(oid), entry) + return mapping diff --git a/api/scripts/add_equipment_runs.sql b/api/scripts/add_equipment_runs.sql index 50c5430..b39e5ef 100644 --- a/api/scripts/add_equipment_runs.sql +++ b/api/scripts/add_equipment_runs.sql @@ -16,14 +16,21 @@ CREATE TABLE IF NOT EXISTS equipment_runs ( outputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, raw_payload_json JSONB NOT NULL DEFAULT '{}'::jsonb, upload_id UUID, + row_index INT, upload_filename VARCHAR(255) DEFAULT '', source VARCHAR(100) DEFAULT 'data_upload', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); +-- Backfill for tables created before row_index existed. +ALTER TABLE equipment_runs ADD COLUMN IF NOT EXISTS row_index INT; + CREATE INDEX IF NOT EXISTS idx_equipment_runs_equipment_id ON equipment_runs(equipment_id); CREATE INDEX IF NOT EXISTS idx_equipment_runs_project_id ON equipment_runs(project_id); CREATE INDEX IF NOT EXISTS idx_equipment_runs_upload_id ON equipment_runs(upload_id); +CREATE UNIQUE INDEX IF NOT EXISTS uq_equipment_runs_upload_row + ON equipment_runs(upload_id, row_index) + WHERE upload_id IS NOT NULL; ALTER TABLE equipment_runs ENABLE ROW LEVEL SECURITY; diff --git a/geddes/k8s/postgres/schema_v2.sql b/geddes/k8s/postgres/schema_v2.sql index 621a21a..3763d02 100644 --- a/geddes/k8s/postgres/schema_v2.sql +++ b/geddes/k8s/postgres/schema_v2.sql @@ -107,6 +107,7 @@ CREATE TABLE IF NOT EXISTS equipment_runs ( outputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, raw_payload_json JSONB NOT NULL DEFAULT '{}'::jsonb, upload_id UUID, + row_index INT, upload_filename VARCHAR(255) DEFAULT '', source VARCHAR(100) DEFAULT 'data_upload', created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP @@ -115,6 +116,11 @@ CREATE TABLE IF NOT EXISTS equipment_runs ( CREATE INDEX IF NOT EXISTS idx_equipment_runs_equipment_id ON equipment_runs(equipment_id); CREATE INDEX IF NOT EXISTS idx_equipment_runs_project_id ON equipment_runs(project_id); CREATE INDEX IF NOT EXISTS idx_equipment_runs_upload_id ON equipment_runs(upload_id); +-- Stable per-upload row identity so reprocessing an upload upserts in place +-- (preserving each run's id / exposed run_id) instead of reallocating ids. +CREATE UNIQUE INDEX IF NOT EXISTS uq_equipment_runs_upload_row + ON equipment_runs(upload_id, row_index) + WHERE upload_id IS NOT NULL; ALTER TABLE equipment_runs ENABLE ROW LEVEL SECURITY; GRANT SELECT ON equipment_runs TO api_client; diff --git a/web/app/data/catalog/[id]/page.tsx b/web/app/data/catalog/[id]/page.tsx index 734ad60..95bdb31 100644 --- a/web/app/data/catalog/[id]/page.tsx +++ b/web/app/data/catalog/[id]/page.tsx @@ -239,7 +239,11 @@ export default function RunDetailPage({ params }: { params: Promise<{ id: string

{key}

- {typeof value === "number" ? formatNumber(value, 2) : (value ?? "—")} + {typeof value === "number" + ? formatNumber(value, 2) + : value === null || value === undefined + ? "—" + : String(value)}

))} @@ -311,7 +315,7 @@ export default function RunDetailPage({ params }: { params: Promise<{ id: string
- + ); From 970451eeaf4efe1d8ee6b2224e493d3bd44a316a Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:37:20 -0500 Subject: [PATCH 08/23] Address Codex review round 7: FAIR fallback + hint-row detection - run_to_jsonld selects the etcher target fallback by equipment_id, so a non-etcher run with no outputs no longer emits null AvgEtchRate/RangeEtchRate properties and is correctly labeled. - Template descriptor-row detection now requires the complete hint row and is only applied to the first data row, so a sparse data row that happens to equal a type token is never silently dropped. Co-authored-by: Cursor --- api/fair_archiver.py | 9 +++++---- api/routers/upload.py | 46 ++++++++++++++++++++++++++----------------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/api/fair_archiver.py b/api/fair_archiver.py index 12c3b9a..a051275 100644 --- a/api/fair_archiver.py +++ b/api/fair_archiver.py @@ -830,12 +830,13 @@ def run_to_jsonld(run: dict[str, Any]) -> dict[str, Any]: prop["prov:qualifiedGeneration"] = "prov:generated" variables_measured.append(prop) - # Build ontology-enriched result list. Generic equipment report their - # measurements under ``outputs``; the canonical etcher uses typed columns. + # Build ontology-enriched result list. The canonical etcher uses typed + # columns; every other equipment reports measurements under ``outputs`` + # (which may legitimately be empty for an inputs-only upload). generic_outputs = run.get("outputs") or {} results = [] - if isinstance(generic_outputs, dict) and generic_outputs: - for key, value in generic_outputs.items(): + if equipment_id != "etcher": + for key, value in (generic_outputs if isinstance(generic_outputs, dict) else {}).items(): onto = ontology_map.get(key, {}) result_prop = { "@type": "PropertyValue", diff --git a/api/routers/upload.py b/api/routers/upload.py index ade5983..6dbe4db 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -331,10 +331,15 @@ def _csv_upload_to_sync_rows( if errors: return [], errors + first_data_row = True for idx, raw_row in enumerate(reader, start=2): - # Skip the template's value-type descriptor row if the user left it in. - if _is_template_hint_row(raw_row, hint_by_column): - continue + # Skip the template's value-type descriptor row only when it is the + # very first data row, so a later row can't be dropped for resembling + # the hint. + if first_data_row: + first_data_row = False + if _is_template_hint_row(raw_row, hint_by_column): + continue row_errors: list[str] = [] row_number = idx - 1 id_value = _upload_value(raw_row, "idruns") @@ -564,20 +569,20 @@ def _is_template_hint_row( hint_by_column: dict[str, str], ) -> bool: """Detect the descriptor row a user may leave in place when filling out the - downloaded template (e.g. ``string,datetime,float,sccm``). Returns True only - when every populated known column exactly equals its template hint, so real - measurement rows (which carry numbers) are never mistaken for hints.""" - if not hint_by_column: + downloaded template (e.g. ``string,datetime,float,sccm``). + + Requires the *complete* hint row: every column that carries a template hint + must be present and exactly equal that hint. This is intentionally strict so + that a sparse, partially-matching data row (e.g. a single ``string`` value in + an otherwise empty row) is never mistaken for the descriptor and silently + dropped. Callers further restrict this check to the first data row.""" + hints = {col: str(hint).strip() for col, hint in hint_by_column.items() if str(hint).strip()} + if not hints: return False - matched = 0 - for column, hint in hint_by_column.items(): - text = str(raw_row.get(column) or "").strip() - if text == "": - continue - if text != str(hint).strip(): + for column, hint in hints.items(): + if str(raw_row.get(column) or "").strip() != hint: return False - matched += 1 - return matched > 0 + return True @router.post("/upload") @@ -1083,10 +1088,15 @@ def _csv_upload_to_equipment_rows( ) return [], errors + first_data_row = True for idx, raw_row in enumerate(reader, start=2): - # Skip the template's value-type descriptor row if the user left it in. - if _is_template_hint_row(raw_row, hint_by_column): - continue + # Skip the template's value-type descriptor row only when it is the + # very first data row, so a later row can't be dropped for resembling + # the hint. + if first_data_row: + first_data_row = False + if _is_template_hint_row(raw_row, hint_by_column): + continue lotname = str(raw_row.get(lot_col) or "").strip() if lot_required and not lotname: From 7b25522e60be0cd22dac1bfe733c055fb044a5fa Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:43:23 -0500 Subject: [PATCH 09/23] Address Codex review round 8: consistent required-column derivation _required_upload_columns now always requires the lot and timestamp metadata columns plus declared inputs (features + parameters), and excludes outputs even when a legacy config lists them as explicit columns. This stops anonymous/ undated catalog runs for new equipment and stops wrongly requiring output columns (or omitting appended parameters) for explicit-column configs. Co-authored-by: Cursor --- api/routers/upload.py | 46 +++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/api/routers/upload.py b/api/routers/upload.py index 6dbe4db..981a9db 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -899,18 +899,21 @@ def add_in(name: Any) -> None: def _required_upload_columns(config: dict[str, Any]) -> list[str]: """Columns a generic upload must contain to be ingestible. - Built from the equipment's declared inputs (explicit columns, else - features + parameters). Outputs/targets are intentionally optional — results - may be reported later — and synthetic placeholder targets are never - required. + Always requires the lot and timestamp metadata columns (so catalog runs are + never anonymous/undated) plus the equipment's declared *inputs* (features + + parameters). Outputs/targets are intentionally optional — results may be + reported later — so they are explicitly excluded even when a legacy config + lists them among explicit ``columns``. Synthetic placeholder targets are + likewise never required. """ - explicit = [ - column["name"] - for column in config.get("columns", []) - if isinstance(column, dict) and column.get("name") - ] - if explicit: - return explicit + data_config = config.get("data", {}) + if not isinstance(data_config, dict): + data_config = {} + lot_col = data_config.get("lotname_column") or "LOTNAME" + ts_col = data_config.get("timestamp_column") or "run_date" + + input_names, output_names = _config_input_output_names(config) + output_set = set(output_names) names: list[str] = [] seen: set[str] = set() @@ -919,20 +922,15 @@ def add(name: Any) -> None: if not name: return text = str(name) - if text not in seen: - seen.add(text) - names.append(text) + if text in output_set or text in seen: + return + seen.add(text) + names.append(text) - for feature in config.get("features", []): - if isinstance(feature, dict): - add(feature.get("id") or feature.get("name")) - else: - add(feature) - for parameter in config.get("parameters", []): - if isinstance(parameter, dict): - add(parameter.get("name") or parameter.get("id")) - else: - add(parameter) + add(lot_col) + add(ts_col) + for name in input_names: + add(name) return names From 45e47a733049efe0f7a9929758aceb7c1219c790 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:50:11 -0500 Subject: [PATCH 10/23] Address Codex review round 9: optional-output upload robustness - upload_file validates registered equipment against _required_upload_columns always (even with explicit columns), so an inputs-only CSV is processable. - Generic ingestion reports an error for non-empty cells that cannot match their declared numeric/boolean type instead of silently storing bad data. - Template descriptor-row detection only compares hint columns present in the uploaded row, so a trimmed inputs-only template is still recognized. - Range warnings recorded at validation time are carried through processing (errors_json) and returned, keeping the processed-upload warning UI working. Co-authored-by: Cursor --- api/routers/upload.py | 82 +++++++++++++++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 19 deletions(-) diff --git a/api/routers/upload.py b/api/routers/upload.py index 981a9db..1c695b4 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -259,7 +259,7 @@ def _load_upload_record(upload_id: str, user: PlatformUser) -> dict[str, Any]: with conn.cursor() as cur: cur.execute( """ - SELECT id, equipment_id, filename, storage_path, status, owner_id, owner_org + SELECT id, equipment_id, filename, storage_path, status, owner_id, owner_org, errors_json FROM data_uploads WHERE id = %s """, @@ -273,6 +273,9 @@ def _load_upload_record(upload_id: str, user: PlatformUser) -> dict[str, Any]: if not row: raise HTTPException(status_code=404, detail="Upload not found") + stored_issues = row[7] + if not isinstance(stored_issues, list): + stored_issues = [] record = { "id": str(row[0]), "equipment_id": row[1], @@ -281,6 +284,7 @@ def _load_upload_record(upload_id: str, user: PlatformUser) -> dict[str, Any]: "status": row[4], "owner_id": row[5], "owner_org": row[6], + "errors": stored_issues, } if user.role != "admin": same_owner = record["owner_id"] == user.id @@ -571,12 +575,17 @@ def _is_template_hint_row( """Detect the descriptor row a user may leave in place when filling out the downloaded template (e.g. ``string,datetime,float,sccm``). - Requires the *complete* hint row: every column that carries a template hint - must be present and exactly equal that hint. This is intentionally strict so - that a sparse, partially-matching data row (e.g. a single ``string`` value in - an otherwise empty row) is never mistaken for the descriptor and silently + Every hint-bearing column that is actually present in the uploaded row must + exactly equal its template hint (columns the user removed from the template + are ignored, so a trimmed inputs-only template is still recognized). This is + intentionally strict over the present columns so that a sparse, + partially-matching data row is never mistaken for the descriptor and silently dropped. Callers further restrict this check to the first data row.""" - hints = {col: str(hint).strip() for col, hint in hint_by_column.items() if str(hint).strip()} + hints = { + col: str(hint).strip() + for col, hint in hint_by_column.items() + if str(hint).strip() and col in raw_row + } if not hints: return False for column, hint in hints.items(): @@ -618,17 +627,13 @@ async def upload_file( status_code=403, detail="Not authorized to upload data for this equipment", ) - expected_columns = [ - column["name"] - for column in equipment.get("columns", []) - if isinstance(column, dict) and column.get("name") - ] - if not expected_columns: - # Required schema for registered equipment is its declared inputs; - # outputs/targets (incl. synthetic placeholders) stay optional. - expected_columns = _required_upload_columns( - equipment.get("config_json", {}) - ) + # Required schema for registered equipment is its lot/timestamp metadata + # plus declared inputs. Outputs/targets (incl. synthetic placeholders and + # any listed among explicit columns) stay optional so an inputs-only CSV + # is still ingestible. + expected_columns = _required_upload_columns( + equipment.get("config_json", {}) + ) else: legacy_config = load_domain_config(equipment_id) if legacy_config is None: @@ -995,6 +1000,25 @@ def _coerce_typed(value: Any, declared_type: str) -> Any: return _coerce_cell(value) +def _declared_type_violation(value: Any, declared_type: str) -> bool: + """Return True when a non-empty cell cannot be represented as its declared + numeric type. String/categorical/unknown types accept anything; booleans and + numbers must parse, otherwise the value would be stored as an invalid typed + cell.""" + if value is None: + return False + text = str(value).strip() + if text == "": + return False + + t = (declared_type or "").strip().lower() + if t in {"int", "integer", "long", "float", "double", "number", "numeric", "real", "decimal"}: + return _coerce_optional_float(text) is None + if t in {"bool", "boolean"}: + return text.lower() not in {"true", "1", "yes", "y", "t", "false", "0", "no", "n", "f"} + return False + + def _coerce_cell(value: Any) -> Any: """Numeric where possible, otherwise the trimmed string (or None).""" if value is None: @@ -1109,6 +1133,16 @@ def _csv_upload_to_equipment_rows( elif ts_required and not ts_raw: errors.append(f"Row {idx}: {ts_col} is required") + for name in (*input_names, *output_names): + if name not in raw_row: + continue + declared = type_by_name.get(name, "") + if _declared_type_violation(raw_row.get(name), declared): + errors.append( + f"Row {idx}: {name} '{str(raw_row.get(name)).strip()}' " + f"is not a valid {declared}" + ) + if len(errors) > 50: errors.append("... (more errors truncated)") break @@ -1161,6 +1195,16 @@ def process_upload( upload_record = _load_upload_record(upload_id, user) equipment_id = upload_record["equipment_id"] + # Non-blocking range warnings recorded at validation time should survive + # processing so the catalog/upload UI can keep surfacing them. A "success" + # upload's stored issues are warnings only (hard errors block the success + # state), so they are safe to carry forward. + prior_warnings = ( + list(upload_record.get("errors") or []) + if upload_record.get("status") == "success" + else [] + ) + # Guard against linking a run to a project that belongs to a different # tool. A project with no equipment association is left unconstrained. project_equipment_id = str(project.get("equipment_id") or "").strip() @@ -1241,7 +1285,7 @@ def _fail(errors: list[str], message: str, row_count: int) -> dict[str, Any]: _update_upload_record( upload_id=upload_id, status="processed", - errors=[], + errors=prior_warnings[:50], row_count=len(rows), ) return { @@ -1251,7 +1295,7 @@ def _fail(errors: list[str], message: str, row_count: int) -> dict[str, Any]: "project_id": project_id, "inserted": inserted, "row_count": len(rows), - "errors": [], + "errors": prior_warnings[:50] if prior_warnings else None, "message": f"Processed {inserted} row(s) into the data catalog.", } From d0bd9af6b3206ab27ab88862668a20f1468a7efe Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:54:24 -0500 Subject: [PATCH 11/23] Address Codex review round 10: etcher schema + process response contract - upload_file validates canonical-etcher CSVs against the full alias-aware REQUIRED_SYNC_COLUMNS (including outputs) so an upload accepted at validation time can actually be processed by _csv_upload_to_sync_rows. - process_upload always returns errors as an array (empty when warning-free), restoring the API contract the upload page relies on (result.errors.length). Co-authored-by: Cursor --- api/routers/upload.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/api/routers/upload.py b/api/routers/upload.py index 1c695b4..05a2f71 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -688,7 +688,27 @@ async def upload_file( header = next(reader, None) if header: columns = [c.strip() for c in header] - if expected_columns: + if equipment_id == "etcher": + # The canonical etcher processor (_csv_upload_to_sync_rows) + # requires the full alias-aware sync schema, including the + # AvgEtchRate/RangeEtchRate outputs. Validate against the same + # contract so an upload accepted here can actually be + # processed. + header_set = set(columns) + missing_columns = [ + canonical + for canonical in REQUIRED_SYNC_COLUMNS + if not any( + alias in header_set + for alias in UPLOAD_TO_SYNC_ALIASES[canonical] + ) + ] + if missing_columns: + errors.append( + "Missing required columns: " + + ", ".join(sorted(missing_columns)) + ) + elif expected_columns: missing_columns = [ column for column in expected_columns if column not in columns ] @@ -1295,7 +1315,7 @@ def _fail(errors: list[str], message: str, row_count: int) -> dict[str, Any]: "project_id": project_id, "inserted": inserted, "row_count": len(rows), - "errors": prior_warnings[:50] if prior_warnings else None, + "errors": prior_warnings[:50], "message": f"Processed {inserted} row(s) into the data catalog.", } From f6a95470f623a9fbf348b1f3ac779707833c177e Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 14:58:13 -0500 Subject: [PATCH 12/23] Address Codex review round 11: reject fractional integer-column values _declared_type_violation now flags non-integral values (e.g. 1.5) for columns declared int/integer/long, so fractional data can't be stored in fields the equipment schema marks as integers. Co-authored-by: Cursor --- api/routers/upload.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/api/routers/upload.py b/api/routers/upload.py index 05a2f71..af0a1bc 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -1032,7 +1032,12 @@ def _declared_type_violation(value: Any, declared_type: str) -> bool: return False t = (declared_type or "").strip().lower() - if t in {"int", "integer", "long", "float", "double", "number", "numeric", "real", "decimal"}: + if t in {"int", "integer", "long"}: + parsed = _coerce_optional_float(text) + # Integer columns must be whole numbers: reject unparseable values and + # fractional ones (e.g. 1.5) so the declared schema isn't violated. + return parsed is None or not parsed.is_integer() + if t in {"float", "double", "number", "numeric", "real", "decimal"}: return _coerce_optional_float(text) is None if t in {"bool", "boolean"}: return text.lower() not in {"true", "1", "yes", "y", "t", "false", "0", "no", "n", "f"} From 1a14bbd2fc8b5c8a7c73a6e95d30c4e3f1f9f10c Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:03:23 -0500 Subject: [PATCH 13/23] Address Codex review round 12: declared template types + etcher identity - Template value-type hints now honor declared parameter/output types (string/boolean/int), so a non-float field is no longer advertised as float. - etcher_runs always report equipment_id/name as the canonical etcher (even when the project has no equipment association), so FAIR JSON-LD keeps etch-rate results and etcher labeling instead of degrading to a generic run. Co-authored-by: Cursor --- api/data_loader_pg.py | 8 ++++++-- api/routers/upload.py | 19 +++++++++++++++---- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/api/data_loader_pg.py b/api/data_loader_pg.py index 4730e6e..838bbdc 100644 --- a/api/data_loader_pg.py +++ b/api/data_loader_pg.py @@ -107,8 +107,12 @@ def _run_base_select(*, include_raw_payload: bool = False) -> str: r.project_id, r.execution_request_id, p.name AS project_name, - p.equipment_id AS equipment_id, - p.equipment_name AS equipment_name, + -- Every etcher_runs row is the canonical etcher regardless of whether + -- its project carries an equipment association, so fall back to the + -- canonical id/name. This keeps FAIR metadata (which keys etcher results + -- and labeling off equipment_id) correct for unassociated projects. + COALESCE(NULLIF(p.equipment_id, ''), 'etcher') AS equipment_id, + COALESCE(NULLIF(p.equipment_name, ''), 'Etcher') AS equipment_name, p.pi_name AS pi_name, p.access_mode AS project_access {raw_payload_select} diff --git a/api/routers/upload.py b/api/routers/upload.py index af0a1bc..3b860a5 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -438,8 +438,9 @@ def _template_columns_and_value_types( if unit: unit_by_name.setdefault(column_name, unit) - # Registered input parameters and output measurements also carry units that - # should drive the template's value-type hints. + # Registered input parameters and output measurements also carry units and + # declared types that should drive the template's value-type hints (so a + # string/bool/int field isn't advertised as float). for collection in (config.get("parameters", []), config.get("outputs", [])): for item in collection: if not isinstance(item, dict): @@ -447,11 +448,15 @@ def _template_columns_and_value_types( name = item.get("name") or item.get("id") if not name: continue + column_name = str(name) unit = str(item.get("unit") or "").strip() if not unit: unit = _qudt_unit_label(item.get("qudt_unit")) if unit: - unit_by_name.setdefault(str(name), unit) + unit_by_name.setdefault(column_name, unit) + declared = _normalize_column_type(item.get("type")) + if declared: + type_by_name.setdefault(column_name, declared) targets = config.get("targets", {}) if isinstance(targets, dict): @@ -484,9 +489,15 @@ def infer_value_type(name: str, *, role: str = "") -> str: if unit: return unit + # A declared type (string/boolean/int/float) is authoritative over the + # role's float default, so non-float fields advertise the correct hint. + declared = type_by_name.get(name, "") + if declared: + return declared + if role in {"feature", "target", "output", "parameter"}: return "float" - return type_by_name.get(name, "") or "string" + return "string" def add(name: Any, *, role: str = "") -> None: if not name: From 26d8b89f45bb80473269f7e9b1a1631a2e1a5108 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:08:30 -0500 Subject: [PATCH 14/23] Address Codex review round 13: canonical etcher identity + template/output fixes - etcher_runs are unconditionally identified/aggregated as the canonical etcher (data_loader_pg base select and metadata_pg metrics), so a project associated with another tool (or none) can't misattribute or drop canonical runs. - Template hints prefer a non-float declared type over a unit, so boolean/int/ string fields advertise their real type while numeric fields keep unit hints. - Generic ingestion omits None-valued input/output cells, so inputs-only or partially-filled uploads don't store misleading {"result": null} measurements. Co-authored-by: Cursor --- api/data_loader_pg.py | 13 +++++++------ api/metadata_pg.py | 12 ++++++------ api/routers/upload.py | 39 +++++++++++++++++++++++++-------------- 3 files changed, 38 insertions(+), 26 deletions(-) diff --git a/api/data_loader_pg.py b/api/data_loader_pg.py index 838bbdc..eed538d 100644 --- a/api/data_loader_pg.py +++ b/api/data_loader_pg.py @@ -107,12 +107,13 @@ def _run_base_select(*, include_raw_payload: bool = False) -> str: r.project_id, r.execution_request_id, p.name AS project_name, - -- Every etcher_runs row is the canonical etcher regardless of whether - -- its project carries an equipment association, so fall back to the - -- canonical id/name. This keeps FAIR metadata (which keys etcher results - -- and labeling off equipment_id) correct for unassociated projects. - COALESCE(NULLIF(p.equipment_id, ''), 'etcher') AS equipment_id, - COALESCE(NULLIF(p.equipment_name, ''), 'Etcher') AS equipment_name, + -- Every etcher_runs row is the canonical etcher regardless of the + -- project's equipment association (which may be empty or, mistakenly, + -- another tool). Hard-code the canonical identity so these rows are never + -- rendered/exported as generic runs and FAIR metadata keeps etch-rate + -- results and etcher labeling. + 'etcher' AS equipment_id, + 'Etcher' AS equipment_name, p.pi_name AS pi_name, p.access_mode AS project_access {raw_payload_select} diff --git a/api/metadata_pg.py b/api/metadata_pg.py index a675da4..f3dda8c 100644 --- a/api/metadata_pg.py +++ b/api/metadata_pg.py @@ -1101,15 +1101,15 @@ def _fetch_equipment_run_metrics(_conn=None) -> dict[str, dict[str, Any]]: SUM(total_runs)::int AS total_runs, MAX(last_data_at) AS last_data_at FROM ( + -- etcher_runs is canonical etcher data: attribute every row + -- to the constant 'etcher' id regardless of the project's + -- equipment association (which may be empty or another tool), + -- so canonical runs are never dropped or miscredited. SELECT - p.equipment_id AS equipment_id, + 'etcher'::text AS equipment_id, COUNT(r.idruns) AS total_runs, MAX(COALESCE(r.run_date, r.created_at)) AS last_data_at - FROM projects p - JOIN etcher_runs r ON r.project_id = p.id - WHERE p.equipment_id IS NOT NULL - AND p.equipment_id <> '' - GROUP BY p.equipment_id + FROM etcher_runs r UNION ALL diff --git a/api/routers/upload.py b/api/routers/upload.py index 3b860a5..e82a194 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -485,13 +485,16 @@ def infer_value_type(name: str, *, role: str = "") -> str: if name == ts_col or role == "timestamp": return "datetime" + declared = type_by_name.get(name, "") unit = unit_by_name.get(name, "") + + # A non-float declared type (string/boolean/int/datetime) is authoritative + # even when a unit is present, so such fields advertise the correct hint + # instead of their unit. Numeric fields keep the more specific unit hint. + if declared and declared != "float": + return declared if unit: return unit - - # A declared type (string/boolean/int/float) is authoritative over the - # role's float default, so non-float fields advertise the correct hint. - declared = type_by_name.get(name, "") if declared: return declared @@ -1183,16 +1186,24 @@ def _csv_upload_to_equipment_rows( errors.append("... (more errors truncated)") break - inputs = { - name: _coerce_typed(raw_row.get(name), type_by_name.get(name, "")) - for name in input_names - if name in raw_row - } - outputs = { - name: _coerce_typed(raw_row.get(name), type_by_name.get(name, "")) - for name in output_names - if name in raw_row - } + # Blank cells coerce to None; omit them so an inputs-only upload (or a + # row that leaves optional outputs empty) doesn't store misleading + # {"result": null} entries that would surface as measured results in + # the catalog and FAIR metadata. + inputs = {} + for name in input_names: + if name not in raw_row: + continue + coerced = _coerce_typed(raw_row.get(name), type_by_name.get(name, "")) + if coerced is not None: + inputs[name] = coerced + outputs = {} + for name in output_names: + if name not in raw_row: + continue + coerced = _coerce_typed(raw_row.get(name), type_by_name.get(name, "")) + if coerced is not None: + outputs[name] = coerced rows.append( { "project_id": project_id, From cbe3b2e855c6b3cd6d6f42571191c0be07101ef4 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:16:09 -0500 Subject: [PATCH 15/23] Address Codex review round 14: include generic runs in dashboard stats get_summary_stats_pg now aggregates etcher_runs together with equipment_runs for total/clean/outlier counts and the date range, so processing a non-etcher upload updates the dashboard's run statistics instead of leaving them unchanged. Co-authored-by: Cursor --- api/data_loader_pg.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/api/data_loader_pg.py b/api/data_loader_pg.py index eed538d..5c3a8b4 100644 --- a/api/data_loader_pg.py +++ b/api/data_loader_pg.py @@ -1572,14 +1572,26 @@ def get_summary_stats_pg( conn = get_pg_superuser_connection() if is_admin else get_pg_connection(nanohub_user_id) with conn.cursor(cursor_factory=RealDictCursor) as cur: + # Aggregate canonical etcher_runs together with generic equipment_runs + # so manually uploaded data for any registered equipment is reflected + # in the dashboard totals, clean/outlier counts, and date range. cur.execute(""" SELECT (SELECT count(*) FROM projects) AS total_projects, - (SELECT count(*) FROM etcher_runs) AS total_runs, - (SELECT count(*) FROM etcher_runs WHERE is_outlier = false AND is_calibration_recipe = false) AS clean_runs, - (SELECT count(*) FROM etcher_runs WHERE is_outlier = true) AS outlier_runs, - (SELECT min(run_date) FROM etcher_runs) AS date_start, - (SELECT max(run_date) FROM etcher_runs) AS date_end + (SELECT count(*) FROM etcher_runs) + + (SELECT count(*) FROM equipment_runs) AS total_runs, + (SELECT count(*) FROM etcher_runs WHERE is_outlier = false AND is_calibration_recipe = false) + + (SELECT count(*) FROM equipment_runs WHERE is_outlier = false AND is_calibration_recipe = false) AS clean_runs, + (SELECT count(*) FROM etcher_runs WHERE is_outlier = true) + + (SELECT count(*) FROM equipment_runs WHERE is_outlier = true) AS outlier_runs, + LEAST( + (SELECT min(run_date) FROM etcher_runs), + (SELECT min(run_date) FROM equipment_runs) + ) AS date_start, + GREATEST( + (SELECT max(run_date) FROM etcher_runs), + (SELECT max(run_date) FROM equipment_runs) + ) AS date_end """) row = cur.fetchone() From 82475ab5301260f7bc22962618d57f222b245c01 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:27:02 -0500 Subject: [PATCH 16/23] Address Codex review round 15: include parameters in FAIR ontology map _get_feature_ontology_map now indexes registered input parameters (unit, QUDT, prov_direction: input), so generic equipment that declare inputs under parameters rather than features keep their units and provenance in FAIR JSON-LD. Co-authored-by: Cursor --- api/fair_archiver.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/api/fair_archiver.py b/api/fair_archiver.py index a051275..456a2a0 100644 --- a/api/fair_archiver.py +++ b/api/fair_archiver.py @@ -229,6 +229,22 @@ def _get_feature_ontology_map(domain_config: dict[str, Any] | None) -> dict[str, entry["qudt_unit"] = target["qudt_unit"] mapping[tid] = entry + # Registered input parameters (set-points captured at registration; always + # input). Generic equipment often declare their inputs here rather than under + # ``features``, so without this their FAIR metadata would lose units/provenance. + for parameter in domain_config.get("parameters", []): + if not isinstance(parameter, dict): + continue + pid = parameter.get("name") or parameter.get("id") + if not pid: + continue + entry = {"unit": parameter.get("unit", ""), "prov_direction": "input"} + if parameter.get("qudt_quantity_kind"): + entry["qudt_quantity_kind"] = parameter["qudt_quantity_kind"] + if parameter.get("qudt_unit"): + entry["qudt_unit"] = parameter["qudt_unit"] + mapping.setdefault(str(pid), entry) + # Registered outputs (measured results captured at registration; always output) for output in domain_config.get("outputs", []): if not isinstance(output, dict): From 4a6975b9020e510548a47ab536ca63ab4dfb462b Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:31:29 -0500 Subject: [PATCH 17/23] Address Codex review round 16: preserve registered hardware in FAIR metadata PG-registered equipment store manufacturer/model/serial_number/location at the config top level, but run_to_jsonld reads them from config["domain"]. _load_domain_config now normalizes the PG config so these hardware/identity fields are backfilled into domain, keeping them in generic-run FAIR JSON-LD. Co-authored-by: Cursor --- api/fair_archiver.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/api/fair_archiver.py b/api/fair_archiver.py index 456a2a0..da8b2f6 100644 --- a/api/fair_archiver.py +++ b/api/fair_archiver.py @@ -185,12 +185,41 @@ def _load_domain_config(equipment_id: str) -> dict[str, Any] | None: if equipment: config = equipment.get("config_json") if isinstance(config, dict) and config: - return config + return _normalize_pg_domain_config(config) except Exception as exc: logger.debug("Could not load PG config for %s: %s", equipment_id, exc) return None +def _normalize_pg_domain_config(config: dict[str, Any]) -> dict[str, Any]: + """Normalize a PostgreSQL-registered equipment config so the FAIR metadata + generator (which reads hardware/identity fields from ``config["domain"]``) + sees the fields ``build_domain_config()`` stores at the config's top level. + + Returns a shallow copy with a ``domain`` dict that backfills (without + overwriting) the registered hardware/identity fields. + """ + if not isinstance(config, dict): + return config + normalized = dict(config) + domain = dict(normalized.get("domain") or {}) + for field in ( + "manufacturer", + "model", + "serial_number", + "location", + "equipment_type", + "sosa_type", + "ontology_uri", + "id", + "name", + ): + if not domain.get(field) and config.get(field): + domain[field] = config[field] + normalized["domain"] = domain + return normalized + + def _get_feature_ontology_map(domain_config: dict[str, Any] | None) -> dict[str, dict[str, str]]: """ Build a mapping from feature/target IDs to their QUDT ontology URIs From f33dc08ee5dc3add20af980b5becc402dd06ab03 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:37:01 -0500 Subject: [PATCH 18/23] Address Codex review round 17: chronological merge + malformed-row rejection - Merged etcher/equipment run pages sort on the actual instant (offset-aware datetimes normalized to UTC) instead of raw ISO strings, so runs with different UTC offsets paginate in true chronological order. - Generic ingestion rejects structurally malformed rows (extra cells under the DictReader None key, or short rows leaving columns at the None restval) so a directly-invoked process call can't turn a malformed CSV into a catalog run. Co-authored-by: Cursor --- api/data_loader_pg.py | 29 ++++++++++++++++++++++++++--- api/routers/upload.py | 19 +++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/api/data_loader_pg.py b/api/data_loader_pg.py index 5c3a8b4..ca411e6 100644 --- a/api/data_loader_pg.py +++ b/api/data_loader_pg.py @@ -2,6 +2,7 @@ import logging import json import hashlib +from datetime import datetime, timezone from typing import List, Dict, Any, Optional import re import uuid @@ -25,6 +26,26 @@ EQUIPMENT_RUN_ID_OFFSET = 5_000_000_000 +def _run_sort_instant(run_date: Any) -> float: + """Return a comparable POSIX timestamp for a run's ``run_date`` so runs from + different physical tables can be merged in true chronological order. + + Handles ISO strings with or without a UTC offset (naive values are treated + as UTC). Missing/unparseable dates sort last under a descending sort.""" + if not run_date: + return float("-inf") + if isinstance(run_date, datetime): + dt = run_date + else: + try: + dt = datetime.fromisoformat(str(run_date).replace("Z", "+00:00")) + except ValueError: + return float("-inf") + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.timestamp() + + def is_mock_db_enabled() -> bool: return os.getenv("USE_MOCK_DATABASE", "false").lower() == "true" @@ -696,9 +717,11 @@ def get_runs_list_pg( _attach_run_file_refs(conn, records) records.extend(_shape_equipment_run_record(row) for row in generic_rows) - # Merge the two sources into a single date-descending page. Rows without - # a run_date sort last, matching the per-source "NULLS LAST" ordering. - records.sort(key=lambda rec: (rec.get("run_date") or ""), reverse=True) + # Merge the two sources into a single date-descending page. Sort on the + # actual instant (not the ISO string) so timestamps with different UTC + # offsets order chronologically; rows without a run_date sort last, + # matching the per-source "NULLS LAST" ordering. + records.sort(key=lambda rec: _run_sort_instant(rec.get("run_date")), reverse=True) records = records[offset:offset + limit] if limit else records[offset:] conn.commit() diff --git a/api/routers/upload.py b/api/routers/upload.py index e82a194..a76a54b 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -1159,6 +1159,25 @@ def _csv_upload_to_equipment_rows( if _is_template_hint_row(raw_row, hint_by_column): continue + # Reject structurally malformed rows even when the process endpoint is + # called directly (bypassing upload-time validation). DictReader stores + # surplus cells under the None key, and a short row leaves declared + # columns at the restval (None); either way the row's data would be + # silently mis-aligned, so it must not become a catalog run. + expected_width = len(reader.fieldnames or []) + if None in raw_row: + errors.append(f"Row {idx}: expected {expected_width} columns, got more") + if len(errors) > 50: + errors.append("... (more errors truncated)") + break + continue + if any(value is None for value in raw_row.values()): + errors.append(f"Row {idx}: expected {expected_width} columns, got fewer") + if len(errors) > 50: + errors.append("... (more errors truncated)") + break + continue + lotname = str(raw_row.get(lot_col) or "").strip() if lot_required and not lotname: errors.append(f"Row {idx}: {lot_col} is required") From 4b5c92f186933e5356858d730bf4e39cbeaf3cc4 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:42:24 -0500 Subject: [PATCH 19/23] Address Codex review round 18: parse declared integers without float round-trip Add _coerce_optional_int and use it for int/integer/long columns in both _coerce_typed and _declared_type_violation, so 64-bit identifiers beyond 2**53 keep full precision instead of being silently rounded via float. Co-authored-by: Cursor --- api/routers/upload.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/api/routers/upload.py b/api/routers/upload.py index a76a54b..308873c 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -104,6 +104,25 @@ def _normalize_column_type(value: Any) -> str: return "" +def _coerce_optional_int(value: Any) -> int | None: + """Parse an integer from a CSV cell without routing through float, so large + 64-bit identifiers/values keep full precision (float64 only holds integers + exactly up to 2**53). Accepts integral float forms like ``10.0`` but rejects + fractional values such as ``1.5``.""" + if value is None: + return None + text = str(value).strip() + if text == "" or text.lower() == "none": + return None + try: + return int(text) + except ValueError: + parsed = _coerce_optional_float(text) + if parsed is not None and parsed.is_integer(): + return int(parsed) + return None + + def _coerce_optional_float(value: Any) -> float | None: if value is None: return None @@ -1022,10 +1041,8 @@ def _coerce_typed(value: Any, declared_type: str) -> Any: return False return text if t in {"int", "integer", "long"}: - parsed = _coerce_optional_float(text) - if parsed is None: - return text - return int(parsed) if parsed.is_integer() else parsed + parsed = _coerce_optional_int(text) + return parsed if parsed is not None else text if t in {"float", "double", "number", "numeric", "real", "decimal"}: parsed = _coerce_optional_float(text) return parsed if parsed is not None else text @@ -1047,10 +1064,10 @@ def _declared_type_violation(value: Any, declared_type: str) -> bool: t = (declared_type or "").strip().lower() if t in {"int", "integer", "long"}: - parsed = _coerce_optional_float(text) - # Integer columns must be whole numbers: reject unparseable values and - # fractional ones (e.g. 1.5) so the declared schema isn't violated. - return parsed is None or not parsed.is_integer() + # Integer columns must be whole numbers: reject unparseable and + # fractional values (e.g. 1.5). Parsed directly so large 64-bit values + # aren't rounded by a float round-trip. + return _coerce_optional_int(text) is None if t in {"float", "double", "number", "numeric", "real", "decimal"}: return _coerce_optional_float(text) is None if t in {"bool", "boolean"}: From d0a347705829b4b057f0340aa25458e9fcc808eb Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:47:01 -0500 Subject: [PATCH 20/23] Address Codex review round 19: exact int decimals + normalize file configs - _coerce_optional_int parses decimal-form integers (e.g. 9007199254740993.0) via Decimal instead of binary float, preserving 64-bit values; non-finite Decimals are rejected. - _load_domain_config normalizes file-backed/registered JSON configs too, so manufacturer/model/serial/location stored at the config top level appear in FAIR metadata for equipment that have a JSON snapshot, not just PG-only ones. Co-authored-by: Cursor --- api/fair_archiver.py | 14 ++++++++------ api/routers/upload.py | 12 +++++++++--- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/api/fair_archiver.py b/api/fair_archiver.py index da8b2f6..4c4b4ec 100644 --- a/api/fair_archiver.py +++ b/api/fair_archiver.py @@ -161,7 +161,7 @@ def _load_domain_config(equipment_id: str) -> dict[str, Any] | None: from domain_configs import load_domain_config config = load_domain_config(equipment_id) if config: - return config + return _normalize_pg_domain_config(config) except Exception: pass @@ -170,7 +170,7 @@ def _load_domain_config(equipment_id: str) -> dict[str, Any] | None: config_path = configs_dir / f"{equipment_id}.json" if config_path.exists(): try: - return json.loads(config_path.read_text()) + return _normalize_pg_domain_config(json.loads(config_path.read_text())) except Exception as exc: logger.debug("Could not load domain config %s: %s", equipment_id, exc) @@ -192,12 +192,14 @@ def _load_domain_config(equipment_id: str) -> dict[str, Any] | None: def _normalize_pg_domain_config(config: dict[str, Any]) -> dict[str, Any]: - """Normalize a PostgreSQL-registered equipment config so the FAIR metadata - generator (which reads hardware/identity fields from ``config["domain"]``) - sees the fields ``build_domain_config()`` stores at the config's top level. + """Normalize a registered equipment config (whether loaded from its JSON + snapshot or from PostgreSQL) so the FAIR metadata generator (which reads + hardware/identity fields from ``config["domain"]``) sees the fields + ``build_domain_config()`` stores at the config's top level. Returns a shallow copy with a ``domain`` dict that backfills (without - overwriting) the registered hardware/identity fields. + overwriting) the registered hardware/identity fields. A config that already + carries everything under ``domain`` (e.g. the canonical etcher) is unchanged. """ if not isinstance(config, dict): return config diff --git a/api/routers/upload.py b/api/routers/upload.py index 308873c..24b48f2 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -8,6 +8,7 @@ import math import os from datetime import datetime +from decimal import Decimal, InvalidOperation from pathlib import Path from typing import Any @@ -117,9 +118,14 @@ def _coerce_optional_int(value: Any) -> int | None: try: return int(text) except ValueError: - parsed = _coerce_optional_float(text) - if parsed is not None and parsed.is_integer(): - return int(parsed) + # Decimal forms like "10.0" or "9007199254740993.0": parse exactly via + # Decimal (never binary float) so large 64-bit integers aren't rounded. + try: + dec = Decimal(text) + except InvalidOperation: + return None + if dec.is_finite() and dec == dec.to_integral_value(): + return int(dec) return None From 6e5e9bea78df8bb4e94dfd72e6197fbeee3564f6 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:53:17 -0500 Subject: [PATCH 21/23] Address Codex review round 20: honor separately-stored column definitions Add _effective_equipment_config to backfill a registered equipment's explicit column schema from its top-level columns (columns_json) when config_json is empty/incomplete. upload_file and _load_equipment_config both use it, so the upload schema validation and generic processing no longer accept or drop files that omit registered columns. Restores test_upload_reports_missing_required_columns. Co-authored-by: Cursor --- api/routers/upload.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/api/routers/upload.py b/api/routers/upload.py index 24b48f2..eeab221 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -669,9 +669,10 @@ async def upload_file( # Required schema for registered equipment is its lot/timestamp metadata # plus declared inputs. Outputs/targets (incl. synthetic placeholders and # any listed among explicit columns) stay optional so an inputs-only CSV - # is still ingestible. + # is still ingestible. The effective config backfills column definitions + # that registration stored separately from config_json. expected_columns = _required_upload_columns( - equipment.get("config_json", {}) + _effective_equipment_config(equipment) ) else: legacy_config = load_domain_config(equipment_id) @@ -688,7 +689,7 @@ async def upload_file( # Extract numerical boundary limits for validation. These come from the # equipment's registered features/parameters/targets and are used to flag # (non-blocking) out-of-range values that usually indicate a unit mismatch. - config_json = equipment.get("config_json", {}) if equipment else legacy_config + config_json = _effective_equipment_config(equipment) if equipment else legacy_config expected_bounds = _extract_expected_bounds(config_json) # Check extension @@ -867,13 +868,28 @@ def list_uploads( return list_uploads_pg(user) +def _effective_equipment_config(equipment: dict[str, Any]) -> dict[str, Any]: + """Return a registered equipment's ingestion config, backfilling the + explicit column definitions from the record's top-level ``columns`` + (columns_json) when ``config_json`` is empty or omits them. Registration can + persist the column schema separately from config_json, so the upload schema + must consult both to avoid accepting/processing files that drop registered + columns.""" + config = equipment.get("config_json") + config = dict(config) if isinstance(config, dict) else {} + if not config.get("columns"): + top_columns = equipment.get("columns") + if isinstance(top_columns, list) and top_columns: + config["columns"] = top_columns + return config + + def _load_equipment_config(equipment_id: str) -> dict[str, Any]: - """Return the config_json for a registered equipment, falling back to a + """Return the ingestion config for a registered equipment, falling back to a legacy JSON domain config. Returns {} if neither is found.""" equipment = get_equipment_pg(equipment_id) if equipment: - config = equipment.get("config_json", {}) - return config if isinstance(config, dict) else {} + return _effective_equipment_config(equipment) legacy = load_domain_config(equipment_id) return legacy if isinstance(legacy, dict) else {} From b085b6afd4d735f5818057a1af24dc697a72d680 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 15:58:11 -0500 Subject: [PATCH 22/23] Address Codex review round 21: honor limit=0 in merged pagination Merged run pagination now returns an empty page for limit=0 (matching the prior SQL LIMIT 0 semantics) instead of treating a falsy limit as unlimited-from- offset; limit=None remains unlimited and limit>0 returns the window. Co-authored-by: Cursor --- api/data_loader_pg.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/api/data_loader_pg.py b/api/data_loader_pg.py index ca411e6..bb90230 100644 --- a/api/data_loader_pg.py +++ b/api/data_loader_pg.py @@ -722,7 +722,12 @@ def get_runs_list_pg( # offsets order chronologically; rows without a run_date sort last, # matching the per-source "NULLS LAST" ordering. records.sort(key=lambda rec: _run_sort_instant(rec.get("run_date")), reverse=True) - records = records[offset:offset + limit] if limit else records[offset:] + # Preserve SQL pagination semantics: limit is None => unlimited, limit 0 + # => an empty page (not "unlimited from offset"), limit > 0 => a window. + if limit is None: + records = records[offset:] + else: + records = records[offset:offset + limit] if limit > 0 else [] conn.commit() conn.close() From 5c93f24cb5e4a4aa0fc9a77751986f5f9d605d60 Mon Sep 17 00:00:00 2001 From: navidgh67 Date: Mon, 15 Jun 2026 16:03:22 -0500 Subject: [PATCH 23/23] Address Codex review round 22: preserve uploaded run quality flags Generic ingestion now parses is_outlier, is_calibration_recipe, and outlier_type (via the standard column aliases) into the top-level row passed to sync_equipment_runs_pg, so uploaded generic runs keep their quality metadata instead of defaulting to false/empty and being misclassified in catalog filters, badges, and summary outlier counts. Co-authored-by: Cursor --- api/routers/upload.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/api/routers/upload.py b/api/routers/upload.py index eeab221..da9a155 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -105,6 +105,14 @@ def _normalize_column_type(value: Any) -> str: return "" +def _coerce_bool_flag(value: Any) -> bool: + """Interpret a CSV cell as a boolean run-quality flag. Anything not clearly + truthy is False, matching the column's database default.""" + if value is None: + return False + return str(value).strip().lower() in {"true", "1", "yes", "y", "t"} + + def _coerce_optional_int(value: Any) -> int | None: """Parse an integer from a CSV cell without routing through float, so large 64-bit identifiers/values keep full precision (float64 only holds integers @@ -1262,6 +1270,10 @@ def _csv_upload_to_equipment_rows( coerced = _coerce_typed(raw_row.get(name), type_by_name.get(name, "")) if coerced is not None: outputs[name] = coerced + # Standard run-quality metadata (if present) drives catalog filters, + # badges, and outlier counts. Parse it into the top-level row (using + # the same column aliases as the etcher path) instead of letting it + # fall back to database defaults. rows.append( { "project_id": project_id, @@ -1271,6 +1283,11 @@ def _csv_upload_to_equipment_rows( "outputs": outputs, "raw": dict(raw_row), "source": "data_upload", + "is_outlier": _coerce_bool_flag(_upload_value(raw_row, "is_outlier")), + "is_calibration_recipe": _coerce_bool_flag( + _upload_value(raw_row, "is_calibration_recipe") + ), + "outlier_type": str(_upload_value(raw_row, "outlier_type") or "").strip(), } )
{meta.label} {meta.description}{formatNumber(value, 2)}{typeof value === "number" ? formatNumber(value, 2) : (value ?? "—")} {meta.unit}
{meta.label} {meta.description}{typeof value === "number" ? formatNumber(value, 2) : (value ?? "—")}{typeof value === "number" ? formatNumber(value, 2) : value === null || value === undefined ? "—" : String(value)} {meta.unit}