diff --git a/api/data_loader_pg.py b/api/data_loader_pg.py index 09aaa1f..bb90230 100644 --- a/api/data_loader_pg.py +++ b/api/data_loader_pg.py @@ -2,6 +2,7 @@ import logging import json import hashlib +from datetime import datetime, timezone from typing import List, Dict, Any, Optional import re import uuid @@ -17,6 +18,34 @@ logger = logging.getLogger(__name__) +# Generic equipment_runs rows are surfaced through the same /runs endpoints as +# etcher_runs. To keep run_id values unambiguous across the two physical tables +# we expose generic rows with their primary key shifted into a disjoint numeric +# space. etcher idruns are 32-bit INTEGERs (and manual etcher uploads use the +# 1.0e9–1.9e9 band), so an offset well above that range cannot collide. +EQUIPMENT_RUN_ID_OFFSET = 5_000_000_000 + + +def _run_sort_instant(run_date: Any) -> float: + """Return a comparable POSIX timestamp for a run's ``run_date`` so runs from + different physical tables can be merged in true chronological order. + + Handles ISO strings with or without a UTC offset (naive values are treated + as UTC). Missing/unparseable dates sort last under a descending sort.""" + if not run_date: + return float("-inf") + if isinstance(run_date, datetime): + dt = run_date + else: + try: + dt = datetime.fromisoformat(str(run_date).replace("Z", "+00:00")) + except ValueError: + return float("-inf") + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.timestamp() + + def is_mock_db_enabled() -> bool: return os.getenv("USE_MOCK_DATABASE", "false").lower() == "true" @@ -99,8 +128,13 @@ def _run_base_select(*, include_raw_payload: bool = False) -> str: r.project_id, r.execution_request_id, p.name AS project_name, - p.equipment_id AS equipment_id, - p.equipment_name AS equipment_name, + -- Every etcher_runs row is the canonical etcher regardless of the + -- project's equipment association (which may be empty or, mistakenly, + -- another tool). Hard-code the canonical identity so these rows are never + -- rendered/exported as generic runs and FAIR metadata keeps etch-rate + -- results and etcher labeling. + 'etcher' AS equipment_id, + 'Etcher' AS equipment_name, p.pi_name AS pi_name, p.access_mode AS project_access {raw_payload_select} @@ -267,6 +301,243 @@ def ensure_etcher_run_file_refs_pg() -> None: conn.close() +def ensure_equipment_runs_pg() -> None: + """Create the generic equipment_runs table, RLS policy, and grants. + + Idempotent — safe to call on every startup. Mirrors the project-membership + visibility model used by ``etcher_runs`` so generic runs respect the same + open/shared/private project access rules. + """ + conn = get_pg_superuser_connection() + try: + with conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS equipment_runs ( + id BIGSERIAL PRIMARY KEY, + equipment_id VARCHAR(255) NOT NULL, + project_id VARCHAR(50) REFERENCES projects(id) ON DELETE SET NULL, + lotname VARCHAR(255), + run_date TIMESTAMP WITH TIME ZONE, + is_outlier BOOLEAN DEFAULT false, + is_calibration_recipe BOOLEAN DEFAULT false, + outlier_type TEXT DEFAULT '', + inputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + outputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + raw_payload_json JSONB NOT NULL DEFAULT '{}'::jsonb, + upload_id UUID, + row_index INT, + upload_filename VARCHAR(255) DEFAULT '', + source VARCHAR(100) DEFAULT 'data_upload', + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + cur.execute( + "ALTER TABLE equipment_runs ADD COLUMN IF NOT EXISTS row_index INT" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_equipment_runs_equipment_id ON equipment_runs(equipment_id)" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_equipment_runs_project_id ON equipment_runs(project_id)" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_equipment_runs_upload_id ON equipment_runs(upload_id)" + ) + cur.execute( + """ + CREATE UNIQUE INDEX IF NOT EXISTS uq_equipment_runs_upload_row + ON equipment_runs(upload_id, row_index) + WHERE upload_id IS NOT NULL + """ + ) + cur.execute("ALTER TABLE equipment_runs ENABLE ROW LEVEL SECURITY") + cur.execute("GRANT SELECT ON equipment_runs TO api_client") + cur.execute( + """ + DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM pg_policies + WHERE schemaname = 'public' + AND tablename = 'equipment_runs' + AND policyname = 'equipment_run_visibility_policy' + ) THEN + CREATE POLICY equipment_run_visibility_policy ON equipment_runs + FOR SELECT + USING ( + EXISTS ( + SELECT 1 + FROM projects p + WHERE p.id = equipment_runs.project_id + AND ( + p.access_mode = 'open' + OR EXISTS ( + SELECT 1 + FROM project_members pm + WHERE pm.project_id = p.id + AND pm.nanohub_user_id = current_setting('app.current_user', true) + ) + ) + ) + ); + END IF; + END + $$; + """ + ) + conn.commit() + finally: + conn.close() + + +def _equipment_runs_base_select(*, include_raw_payload: bool = False) -> str: + """SELECT that shapes generic equipment_runs into the same record format + used by the etcher /runs endpoints (so the catalog can render both).""" + raw_payload_select = ",\n r.raw_payload_json" if include_raw_payload else "" + return f""" + SELECT + (r.id + {EQUIPMENT_RUN_ID_OFFSET}) AS run_id, + r.lotname AS lot_name, + r.run_date, + r.created_at, + r.is_outlier, + r.is_calibration_recipe AS is_calibration, + r.outlier_type, + r.inputs_json, + r.outputs_json, + r.upload_filename, + r.source, + r.project_id, + ''::text AS execution_request_id, + p.name AS project_name, + r.equipment_id AS equipment_id, + COALESCE(em.equipment_name, p.equipment_name, r.equipment_id) AS equipment_name, + p.pi_name AS pi_name, + p.access_mode AS project_access + {raw_payload_select} + FROM equipment_runs r + LEFT JOIN projects p ON p.id = r.project_id + LEFT JOIN equipment_metadata em ON em.domain_id = r.equipment_id +""" + + +def _shape_equipment_run_record(row) -> Dict[str, Any]: + """Shape a raw equipment_runs row into the API response format used by the + v2 endpoints, mirroring _shape_run_record's contract.""" + rec = dict(row) + for ts_field in ("run_date", "created_at"): + if rec.get(ts_field): + rec[ts_field] = ( + rec[ts_field].isoformat() + if hasattr(rec[ts_field], "isoformat") + else str(rec[ts_field]) + ) + # Expose the validated input set-points under the same "features" key the + # etcher rows use, and surface measured outputs alongside. + rec["features"] = rec.pop("inputs_json", {}) or {} + rec["outputs"] = rec.pop("outputs_json", {}) or {} + # The catalog renders etch-rate badges only when these are numeric; generic + # equipment has no canonical etch-rate, so leave them absent. + rec.setdefault("avg_etch_rate", None) + rec.setdefault("range_etch_rate", None) + rec.setdefault("range_nm", None) + rec["equipment_name"] = rec.get("equipment_name") or rec.get("equipment_id") or "" + raw_payload = rec.pop("raw_payload_json", None) + if raw_payload is not None: + rec["raw_payload"] = raw_payload + rec["file_refs"] = [] + return rec + + +def sync_equipment_runs_pg( + *, + equipment_id: str, + rows: List[Dict[str, Any]], + upload_id: Optional[str] = None, + upload_filename: str = "", +) -> int: + """Insert validated rows for an arbitrary registered equipment into the + generic equipment_runs table. + + Each row dict is expected to contain: project_id, lotname, run_date, + inputs (dict), outputs (dict), raw (dict), and optional is_outlier / + is_calibration_recipe / outlier_type flags. + + When ``upload_id`` is provided, rows are upserted on their stable + ``(upload_id, row_index)`` identity so re-processing the same upload is + idempotent and preserves each run's id (and therefore its exposed run_id / + catalog URL). Any rows left over from a previous, longer version of the + upload are pruned afterwards. + """ + from psycopg2.extras import execute_values + + if not rows: + return 0 + + conn = get_pg_superuser_connection() + try: + insert_rows = [] + for index, row in enumerate(rows): + insert_rows.append(( + equipment_id, + row.get("project_id"), + row.get("lotname"), + row.get("run_date") or None, + bool(row.get("is_outlier", False)), + bool(row.get("is_calibration_recipe", False)), + str(row.get("outlier_type", "") or ""), + Json(_json_safe_payload(row.get("inputs", {}) or {})), + Json(_json_safe_payload(row.get("outputs", {}) or {})), + Json(_json_safe_payload(row.get("raw", {}) or {})), + upload_id, + index, + upload_filename, + str(row.get("source", "data_upload") or "data_upload"), + )) + + with conn.cursor() as cur: + execute_values( + cur, + """ + INSERT INTO equipment_runs ( + equipment_id, project_id, lotname, run_date, + is_outlier, is_calibration_recipe, outlier_type, + inputs_json, outputs_json, raw_payload_json, + upload_id, row_index, upload_filename, source + ) VALUES %s + ON CONFLICT (upload_id, row_index) WHERE upload_id IS NOT NULL + DO UPDATE SET + equipment_id = EXCLUDED.equipment_id, + project_id = EXCLUDED.project_id, + lotname = EXCLUDED.lotname, + run_date = EXCLUDED.run_date, + is_outlier = EXCLUDED.is_outlier, + is_calibration_recipe = EXCLUDED.is_calibration_recipe, + outlier_type = EXCLUDED.outlier_type, + inputs_json = EXCLUDED.inputs_json, + outputs_json = EXCLUDED.outputs_json, + raw_payload_json = EXCLUDED.raw_payload_json, + upload_filename = EXCLUDED.upload_filename, + source = EXCLUDED.source + """, + insert_rows, + ) + # Prune rows from a previous, longer version of this upload so a + # shrunk re-upload doesn't leave stale runs behind. + if upload_id is not None: + cur.execute( + "DELETE FROM equipment_runs WHERE upload_id = %s AND row_index >= %s", + (upload_id, len(insert_rows)), + ) + conn.commit() + return len(insert_rows) + finally: + conn.close() + + def _first_text(row: Dict[str, Any], *keys: str) -> str: for key in keys: value = row.get(key) @@ -408,28 +679,55 @@ def get_runs_list_pg( else: conn = get_pg_connection(nanohub_user_id) - query = _run_base_select(include_raw_payload=include_raw_payload) + # Build the shared WHERE clause for both physical run tables. The + # column names line up (is_outlier, is_calibration_recipe, project_id, + # run_date) so the same predicates apply to etcher_runs and the generic + # equipment_runs table. conditions: list[str] = [] - params: list[Any] = [] - + base_params: list[Any] = [] if not include_outliers: conditions.append("r.is_outlier = false AND r.is_calibration_recipe = false") if project_id: conditions.append("r.project_id = %s") - params.append(project_id) + base_params.append(project_id) + where_clause = (" WHERE " + " AND ".join(conditions)) if conditions else "" - if conditions: - query += " WHERE " + " AND ".join(conditions) + # Over-fetch (limit + offset) from each source so a merged, globally + # date-sorted page is still correct after slicing in Python. + page_cap = max(0, limit) + max(0, offset) - query += " ORDER BY r.run_date DESC NULLS LAST LIMIT %s OFFSET %s" - params.extend([limit, offset]) + etcher_query = ( + _run_base_select(include_raw_payload=include_raw_payload) + + where_clause + + " ORDER BY r.run_date DESC NULLS LAST LIMIT %s" + ) + generic_query = ( + _equipment_runs_base_select(include_raw_payload=include_raw_payload) + + where_clause + + " ORDER BY r.run_date DESC NULLS LAST LIMIT %s" + ) with conn.cursor(cursor_factory=RealDictCursor) as cur: - cur.execute(query, tuple(params)) - rows = cur.fetchall() + cur.execute(etcher_query, tuple(base_params + [page_cap])) + etcher_rows = cur.fetchall() + cur.execute(generic_query, tuple(base_params + [page_cap])) + generic_rows = cur.fetchall() - records = [_shape_run_record(row) for row in rows] + records = [_shape_run_record(row) for row in etcher_rows] _attach_run_file_refs(conn, records) + records.extend(_shape_equipment_run_record(row) for row in generic_rows) + + # Merge the two sources into a single date-descending page. Sort on the + # actual instant (not the ISO string) so timestamps with different UTC + # offsets order chronologically; rows without a run_date sort last, + # matching the per-source "NULLS LAST" ordering. + records.sort(key=lambda rec: _run_sort_instant(rec.get("run_date")), reverse=True) + # Preserve SQL pagination semantics: limit is None => unlimited, limit 0 + # => an empty page (not "unlimited from offset"), limit > 0 => a window. + if limit is None: + records = records[offset:] + else: + records = records[offset:offset + limit] if limit > 0 else [] conn.commit() conn.close() @@ -467,6 +765,26 @@ def get_run_detail_pg( else: conn = get_pg_connection(nanohub_user_id) + # run_id values at/above the offset belong to the generic + # equipment_runs table (see EQUIPMENT_RUN_ID_OFFSET); everything below + # is a canonical etcher_runs.idruns. + if run_id >= EQUIPMENT_RUN_ID_OFFSET: + query = ( + _equipment_runs_base_select(include_raw_payload=True) + + " WHERE r.id = %s LIMIT 1" + ) + with conn.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute(query, (run_id - EQUIPMENT_RUN_ID_OFFSET,)) + row = cur.fetchone() + if not row: + conn.commit() + conn.close() + return None + record = _shape_equipment_run_record(row) + conn.commit() + conn.close() + return record + query = _run_base_select(include_raw_payload=True) + " WHERE r.idruns = %s LIMIT 1" with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(query, (run_id,)) @@ -1282,14 +1600,26 @@ def get_summary_stats_pg( conn = get_pg_superuser_connection() if is_admin else get_pg_connection(nanohub_user_id) with conn.cursor(cursor_factory=RealDictCursor) as cur: + # Aggregate canonical etcher_runs together with generic equipment_runs + # so manually uploaded data for any registered equipment is reflected + # in the dashboard totals, clean/outlier counts, and date range. cur.execute(""" SELECT (SELECT count(*) FROM projects) AS total_projects, - (SELECT count(*) FROM etcher_runs) AS total_runs, - (SELECT count(*) FROM etcher_runs WHERE is_outlier = false AND is_calibration_recipe = false) AS clean_runs, - (SELECT count(*) FROM etcher_runs WHERE is_outlier = true) AS outlier_runs, - (SELECT min(run_date) FROM etcher_runs) AS date_start, - (SELECT max(run_date) FROM etcher_runs) AS date_end + (SELECT count(*) FROM etcher_runs) + + (SELECT count(*) FROM equipment_runs) AS total_runs, + (SELECT count(*) FROM etcher_runs WHERE is_outlier = false AND is_calibration_recipe = false) + + (SELECT count(*) FROM equipment_runs WHERE is_outlier = false AND is_calibration_recipe = false) AS clean_runs, + (SELECT count(*) FROM etcher_runs WHERE is_outlier = true) + + (SELECT count(*) FROM equipment_runs WHERE is_outlier = true) AS outlier_runs, + LEAST( + (SELECT min(run_date) FROM etcher_runs), + (SELECT min(run_date) FROM equipment_runs) + ) AS date_start, + GREATEST( + (SELECT max(run_date) FROM etcher_runs), + (SELECT max(run_date) FROM equipment_runs) + ) AS date_end """) row = cur.fetchone() diff --git a/api/fair_archiver.py b/api/fair_archiver.py index 5e12b77..4c4b4ec 100644 --- a/api/fair_archiver.py +++ b/api/fair_archiver.py @@ -159,7 +159,9 @@ def _load_domain_config(equipment_id: str) -> dict[str, Any] | None: """ try: from domain_configs import load_domain_config - return load_domain_config(equipment_id) + config = load_domain_config(equipment_id) + if config: + return _normalize_pg_domain_config(config) except Exception: pass @@ -168,12 +170,58 @@ def _load_domain_config(equipment_id: str) -> dict[str, Any] | None: config_path = configs_dir / f"{equipment_id}.json" if config_path.exists(): try: - return json.loads(config_path.read_text()) + return _normalize_pg_domain_config(json.loads(config_path.read_text())) except Exception as exc: logger.debug("Could not load domain config %s: %s", equipment_id, exc) + + # Fallback: dynamically registered equipment have no JSON file on disk; + # their config lives in PostgreSQL. Pull config_json so FAIR metadata still + # carries registered units, ontology, hardware, and owner info. + if equipment_id and equipment_id != "unknown": + try: + from metadata_pg import get_equipment_pg + + equipment = get_equipment_pg(equipment_id) + if equipment: + config = equipment.get("config_json") + if isinstance(config, dict) and config: + return _normalize_pg_domain_config(config) + except Exception as exc: + logger.debug("Could not load PG config for %s: %s", equipment_id, exc) return None +def _normalize_pg_domain_config(config: dict[str, Any]) -> dict[str, Any]: + """Normalize a registered equipment config (whether loaded from its JSON + snapshot or from PostgreSQL) so the FAIR metadata generator (which reads + hardware/identity fields from ``config["domain"]``) sees the fields + ``build_domain_config()`` stores at the config's top level. + + Returns a shallow copy with a ``domain`` dict that backfills (without + overwriting) the registered hardware/identity fields. A config that already + carries everything under ``domain`` (e.g. the canonical etcher) is unchanged. + """ + if not isinstance(config, dict): + return config + normalized = dict(config) + domain = dict(normalized.get("domain") or {}) + for field in ( + "manufacturer", + "model", + "serial_number", + "location", + "equipment_type", + "sosa_type", + "ontology_uri", + "id", + "name", + ): + if not domain.get(field) and config.get(field): + domain[field] = config[field] + normalized["domain"] = domain + return normalized + + def _get_feature_ontology_map(domain_config: dict[str, Any] | None) -> dict[str, dict[str, str]]: """ Build a mapping from feature/target IDs to their QUDT ontology URIs @@ -212,6 +260,36 @@ def _get_feature_ontology_map(domain_config: dict[str, Any] | None) -> dict[str, entry["qudt_unit"] = target["qudt_unit"] mapping[tid] = entry + # Registered input parameters (set-points captured at registration; always + # input). Generic equipment often declare their inputs here rather than under + # ``features``, so without this their FAIR metadata would lose units/provenance. + for parameter in domain_config.get("parameters", []): + if not isinstance(parameter, dict): + continue + pid = parameter.get("name") or parameter.get("id") + if not pid: + continue + entry = {"unit": parameter.get("unit", ""), "prov_direction": "input"} + if parameter.get("qudt_quantity_kind"): + entry["qudt_quantity_kind"] = parameter["qudt_quantity_kind"] + if parameter.get("qudt_unit"): + entry["qudt_unit"] = parameter["qudt_unit"] + mapping.setdefault(str(pid), entry) + + # Registered outputs (measured results captured at registration; always output) + for output in domain_config.get("outputs", []): + if not isinstance(output, dict): + continue + oid = output.get("name") or output.get("id") + if not oid: + continue + entry = {"unit": output.get("unit", ""), "prov_direction": "output"} + if output.get("qudt_quantity_kind"): + entry["qudt_quantity_kind"] = output["qudt_quantity_kind"] + if output.get("qudt_unit"): + entry["qudt_unit"] = output["qudt_unit"] + mapping.setdefault(str(oid), entry) + return mapping @@ -764,6 +842,9 @@ def run_to_jsonld(run: dict[str, Any]) -> dict[str, Any]: project_name = run.get("project_name", "Unknown Project") equipment_name = run.get("equipment_name", "Unknown Equipment") equipment_id = run.get("equipment_id", "unknown") + # Label the dataset by its actual equipment so non-etcher runs aren't + # misrepresented as etcher runs in FAIR metadata. + run_label = "Etcher" if equipment_id == "etcher" else (equipment_name or "Equipment") # Load domain config for ontology URIs domain_config = _load_domain_config(equipment_id) @@ -796,26 +877,46 @@ def run_to_jsonld(run: dict[str, Any]) -> dict[str, Any]: prop["prov:qualifiedGeneration"] = "prov:generated" variables_measured.append(prop) - # Build ontology-enriched target list + # Build ontology-enriched result list. The canonical etcher uses typed + # columns; every other equipment reports measurements under ``outputs`` + # (which may legitimately be empty for an inputs-only upload). + generic_outputs = run.get("outputs") or {} results = [] - for target_id, target_label, target_key in [ - ("AvgEtchRate", "Etch Rate", "avg_etch_rate"), - ("RangeEtchRate", "Thickness Range", "range_etch_rate"), - ]: - onto = ontology_map.get(target_id, {}) - result_prop: dict[str, Any] = { - "@type": "PropertyValue", - "name": target_id, - "value": run.get(target_key), - } - if onto.get("unit"): - result_prop["unitText"] = onto["unit"] - if onto.get("qudt_quantity_kind"): - result_prop["qudt:hasQuantityKind"] = {"@id": onto["qudt_quantity_kind"]} - if onto.get("qudt_unit"): - result_prop["qudt:hasUnit"] = {"@id": onto["qudt_unit"]} - result_prop["prov:qualifiedGeneration"] = "prov:generated" - results.append(result_prop) + if equipment_id != "etcher": + for key, value in (generic_outputs if isinstance(generic_outputs, dict) else {}).items(): + onto = ontology_map.get(key, {}) + result_prop = { + "@type": "PropertyValue", + "name": key, + "value": value, + } + if onto.get("unit"): + result_prop["unitText"] = onto["unit"] + if onto.get("qudt_quantity_kind"): + result_prop["qudt:hasQuantityKind"] = {"@id": onto["qudt_quantity_kind"]} + if onto.get("qudt_unit"): + result_prop["qudt:hasUnit"] = {"@id": onto["qudt_unit"]} + result_prop["prov:qualifiedGeneration"] = "prov:generated" + results.append(result_prop) + else: + for target_id, target_label, target_key in [ + ("AvgEtchRate", "Etch Rate", "avg_etch_rate"), + ("RangeEtchRate", "Thickness Range", "range_etch_rate"), + ]: + onto = ontology_map.get(target_id, {}) + result_prop: dict[str, Any] = { + "@type": "PropertyValue", + "name": target_id, + "value": run.get(target_key), + } + if onto.get("unit"): + result_prop["unitText"] = onto["unit"] + if onto.get("qudt_quantity_kind"): + result_prop["qudt:hasQuantityKind"] = {"@id": onto["qudt_quantity_kind"]} + if onto.get("qudt_unit"): + result_prop["qudt:hasUnit"] = {"@id": onto["qudt_unit"]} + result_prop["prov:qualifiedGeneration"] = "prov:generated" + results.append(result_prop) # Build equipment entity with SAREF metadata equipment_entity: dict[str, Any] = { @@ -864,11 +965,11 @@ def run_to_jsonld(run: dict[str, Any]) -> dict[str, Any]: }, ], "@type": "Dataset", - "name": f"Etcher Run {run.get('run_id', 'unknown')}", + "name": f"{run_label} Run {run.get('run_id', 'unknown')}", "identifier": str(run.get("run_id", "")), "dateCreated": run.get("run_date") or run.get("created_at") or "", "description": ( - f"Etcher run {run.get('run_id')} from lot {run.get('lot_name', 'N/A')}. " + f"{run_label} run {run.get('run_id')} from lot {run.get('lot_name', 'N/A')}. " f"Equipment: {equipment_name}. Project: {project_name}." ), "license": {"@id": DEFAULT_LICENSE}, diff --git a/api/main.py b/api/main.py index 0578adb..7b54652 100644 --- a/api/main.py +++ b/api/main.py @@ -141,6 +141,7 @@ def _run_startup_migrations() -> None: """ from data_loader_pg import get_pg_superuser_connection from data_loader_pg import ensure_etcher_run_file_refs_pg + from data_loader_pg import ensure_equipment_runs_pg from metadata_pg import ( ensure_experiment_proposal_generation_columns_pg, ensure_experiment_type_reuse_columns_pg, @@ -165,6 +166,7 @@ def _run_startup_migrations() -> None: lock_conn.commit() ensure_etcher_run_file_refs_pg() + ensure_equipment_runs_pg() ensure_experiment_proposal_generation_columns_pg() ensure_experiment_type_reuse_columns_pg() ensure_experiment_type_versioning_columns_pg() diff --git a/api/metadata_pg.py b/api/metadata_pg.py index 871f077..f3dda8c 100644 --- a/api/metadata_pg.py +++ b/api/metadata_pg.py @@ -1078,22 +1078,57 @@ def _serialize_equipment( } -def _fetch_equipment_run_metrics(conn) -> dict[str, dict[str, Any]]: - with conn.cursor(cursor_factory=RealDictCursor) as cur: - cur.execute( - """ - SELECT - p.equipment_id, - COUNT(r.idruns)::int AS total_runs, - MAX(COALESCE(r.run_date, r.created_at)) AS last_data_at - FROM projects p - JOIN etcher_runs r ON r.project_id = p.id - WHERE p.equipment_id IS NOT NULL - AND p.equipment_id <> '' - GROUP BY p.equipment_id - """ - ) - rows = cur.fetchall() +def _fetch_equipment_run_metrics(_conn=None) -> dict[str, dict[str, Any]]: + """Aggregate fleet-wide run counts and latest-data timestamps per equipment. + + Combines the canonical etcher_runs (attributed via the project's equipment) + with the generic equipment_runs table (attributed via the run's own + equipment_id) so manually uploaded data for any registered equipment is + reflected in the inventory metrics. + + Runs on the superuser connection so these non-sensitive aggregate counts are + complete (fleet-wide) regardless of the caller's project membership. The + RLS-scoped connection passed by callers would otherwise hide runs in + private/shared projects, even from members and admins, and undercount. + """ + conn = get_pg_superuser_connection() + try: + with conn.cursor(cursor_factory=RealDictCursor) as cur: + cur.execute( + """ + SELECT + equipment_id, + SUM(total_runs)::int AS total_runs, + MAX(last_data_at) AS last_data_at + FROM ( + -- etcher_runs is canonical etcher data: attribute every row + -- to the constant 'etcher' id regardless of the project's + -- equipment association (which may be empty or another tool), + -- so canonical runs are never dropped or miscredited. + SELECT + 'etcher'::text AS equipment_id, + COUNT(r.idruns) AS total_runs, + MAX(COALESCE(r.run_date, r.created_at)) AS last_data_at + FROM etcher_runs r + + UNION ALL + + SELECT + er.equipment_id AS equipment_id, + COUNT(er.id) AS total_runs, + MAX(COALESCE(er.run_date, er.created_at)) AS last_data_at + FROM equipment_runs er + WHERE er.equipment_id IS NOT NULL + AND er.equipment_id <> '' + GROUP BY er.equipment_id + ) combined + GROUP BY equipment_id + """ + ) + rows = cur.fetchall() + conn.commit() + finally: + conn.close() return {row["equipment_id"]: dict(row) for row in rows} diff --git a/api/routers/upload.py b/api/routers/upload.py index 189ceb9..da9a155 100644 --- a/api/routers/upload.py +++ b/api/routers/upload.py @@ -5,8 +5,10 @@ import io import hashlib import logging +import math import os from datetime import datetime +from decimal import Decimal, InvalidOperation from pathlib import Path from typing import Any @@ -16,7 +18,12 @@ from pydantic import BaseModel from domain_configs import derive_expected_columns, load_domain_config -from data_loader_pg import get_pg_connection, get_projects_list_pg, sync_runs_pg +from data_loader_pg import ( + get_pg_connection, + get_projects_list_pg, + sync_equipment_runs_pg, + sync_runs_pg, +) from metadata_pg import ( approved_equipment_available_to_user, create_upload_record_pg, @@ -98,6 +105,119 @@ def _normalize_column_type(value: Any) -> str: return "" +def _coerce_bool_flag(value: Any) -> bool: + """Interpret a CSV cell as a boolean run-quality flag. Anything not clearly + truthy is False, matching the column's database default.""" + if value is None: + return False + return str(value).strip().lower() in {"true", "1", "yes", "y", "t"} + + +def _coerce_optional_int(value: Any) -> int | None: + """Parse an integer from a CSV cell without routing through float, so large + 64-bit identifiers/values keep full precision (float64 only holds integers + exactly up to 2**53). Accepts integral float forms like ``10.0`` but rejects + fractional values such as ``1.5``.""" + if value is None: + return None + text = str(value).strip() + if text == "" or text.lower() == "none": + return None + try: + return int(text) + except ValueError: + # Decimal forms like "10.0" or "9007199254740993.0": parse exactly via + # Decimal (never binary float) so large 64-bit integers aren't rounded. + try: + dec = Decimal(text) + except InvalidOperation: + return None + if dec.is_finite() and dec == dec.to_integral_value(): + return int(dec) + return None + + +def _coerce_optional_float(value: Any) -> float | None: + if value is None: + return None + text = str(value).strip() + if text == "" or text.lower() == "none": + return None + try: + parsed = float(text) + except (TypeError, ValueError): + return None + # Reject NaN/Infinity: they parse fine in Python but are invalid JSON + # tokens for PostgreSQL JSONB and would break ingestion. + if not math.isfinite(parsed): + return None + return parsed + + +def _extract_expected_bounds( + config_json: dict[str, Any], +) -> dict[str, tuple[float | None, float | None]]: + """Collect per-column min/max limits from an equipment config. + + Reads the actual shapes used by both PG-registered and legacy JSON configs: + - features: keyed by ``id`` (or ``name``), limits in ``min``/``max`` or + ``min_value``/``max_value`` + - parameters: keyed by ``name`` (or ``id``), limits in ``min_value``/ + ``max_value`` or ``min``/``max`` + - targets: ``targets.primary``/``targets.secondary`` keyed by ``id``, + limits inside a ``constraints`` object + - legacy flat ``primary_target``/``secondary_target`` objects + """ + bounds: dict[str, tuple[float | None, float | None]] = {} + + def consider(name: Any, min_raw: Any, max_raw: Any) -> None: + if not name: + return + min_v = _coerce_optional_float(min_raw) + max_v = _coerce_optional_float(max_raw) + if min_v is not None or max_v is not None: + bounds[str(name)] = (min_v, max_v) + + if not isinstance(config_json, dict): + return bounds + + for feature in config_json.get("features", []): + if isinstance(feature, dict): + consider( + feature.get("id") or feature.get("name"), + feature.get("min", feature.get("min_value")), + feature.get("max", feature.get("max_value")), + ) + + for parameter in config_json.get("parameters", []): + if isinstance(parameter, dict): + consider( + parameter.get("name") or parameter.get("id"), + parameter.get("min_value", parameter.get("min")), + parameter.get("max_value", parameter.get("max")), + ) + + targets = config_json.get("targets", {}) + target_objs: list[Any] = [] + if isinstance(targets, dict): + target_objs.extend([targets.get("primary"), targets.get("secondary")]) + target_objs.extend( + [config_json.get("primary_target"), config_json.get("secondary_target")] + ) + for target in target_objs: + if not isinstance(target, dict): + continue + constraints = target.get("constraints") + constraints = constraints if isinstance(constraints, dict) else {} + consider( + target.get("id") or target.get("name"), + constraints.get("min", target.get("min_value")), + constraints.get("max", target.get("max_value")), + ) + + return bounds + + def _upload_value(row: dict[str, Any], canonical: str) -> Any: for key in UPLOAD_TO_SYNC_ALIASES.get(canonical, (canonical,)): if key in row and str(row.get(key) or "").strip() != "": @@ -172,7 +292,7 @@ def _load_upload_record(upload_id: str, user: PlatformUser) -> dict[str, Any]: with conn.cursor() as cur: cur.execute( """ - SELECT id, equipment_id, filename, storage_path, status, owner_id, owner_org + SELECT id, equipment_id, filename, storage_path, status, owner_id, owner_org, errors_json FROM data_uploads WHERE id = %s """, @@ -186,6 +306,9 @@ def _load_upload_record(upload_id: str, user: PlatformUser) -> dict[str, Any]: if not row: raise HTTPException(status_code=404, detail="Upload not found") + stored_issues = row[7] + if not isinstance(stored_issues, list): + stored_issues = [] record = { "id": str(row[0]), "equipment_id": row[1], @@ -194,6 +317,7 @@ def _load_upload_record(upload_id: str, user: PlatformUser) -> dict[str, Any]: "status": row[4], "owner_id": row[5], "owner_org": row[6], + "errors": stored_issues, } if user.role != "admin": same_owner = record["owner_id"] == user.id @@ -203,13 +327,20 @@ def _load_upload_record(upload_id: str, user: PlatformUser) -> dict[str, Any]: return record -def _assert_project_visible(project_id: str, user: PlatformUser) -> None: +def _assert_project_visible(project_id: str, user: PlatformUser) -> dict[str, Any]: + """Return the visible project record, or raise 404 if the caller can't see + it (private project membership is never disclosed).""" visible_projects = get_projects_list_pg( nanohub_user_id=user.id, is_admin=user.role == "admin", ) - if not any(project.get("id") == project_id for project in visible_projects): + project = next( + (p for p in visible_projects if p.get("id") == project_id), + None, + ) + if project is None: raise HTTPException(status_code=404, detail="Selected project was not found") + return project def _csv_upload_to_sync_rows( @@ -224,6 +355,8 @@ def _csv_upload_to_sync_rows( if not path.exists(): return [], ["Stored upload file is missing from disk."] + hint_by_column = _template_hint_by_column(_load_equipment_config(upload_record["equipment_id"])) + errors: list[str] = [] rows: list[dict[str, Any]] = [] with path.open("r", encoding="utf-8-sig", newline="") as f: @@ -235,7 +368,15 @@ def _csv_upload_to_sync_rows( if errors: return [], errors + first_data_row = True for idx, raw_row in enumerate(reader, start=2): + # Skip the template's value-type descriptor row only when it is the + # very first data row, so a later row can't be dropped for resembling + # the hint. + if first_data_row: + first_data_row = False + if _is_template_hint_row(raw_row, hint_by_column): + continue row_errors: list[str] = [] row_number = idx - 1 id_value = _upload_value(raw_row, "idruns") @@ -330,6 +471,26 @@ def _template_columns_and_value_types( if unit: unit_by_name.setdefault(column_name, unit) + # Registered input parameters and output measurements also carry units and + # declared types that should drive the template's value-type hints (so a + # string/bool/int field isn't advertised as float). + for collection in (config.get("parameters", []), config.get("outputs", [])): + for item in collection: + if not isinstance(item, dict): + continue + name = item.get("name") or item.get("id") + if not name: + continue + column_name = str(name) + unit = str(item.get("unit") or "").strip() + if not unit: + unit = _qudt_unit_label(item.get("qudt_unit")) + if unit: + unit_by_name.setdefault(column_name, unit) + declared = _normalize_column_type(item.get("type")) + if declared: + type_by_name.setdefault(column_name, declared) + targets = config.get("targets", {}) if isinstance(targets, dict): for target in (targets.get("primary"), targets.get("secondary")): @@ -357,13 +518,22 @@ def infer_value_type(name: str, *, role: str = "") -> str: if name == ts_col or role == "timestamp": return "datetime" + declared = type_by_name.get(name, "") unit = unit_by_name.get(name, "") + + # A non-float declared type (string/boolean/int/datetime) is authoritative + # even when a unit is present, so such fields advertise the correct hint + # instead of their unit. Numeric fields keep the more specific unit hint. + if declared and declared != "float": + return declared if unit: return unit + if declared: + return declared - if role in {"feature", "target"}: + if role in {"feature", "target", "output", "parameter"}: return "float" - return type_by_name.get(name, "") or "string" + return "string" def add(name: Any, *, role: str = "") -> None: if not name: @@ -375,9 +545,44 @@ def add(name: Any, *, role: str = "") -> None: columns.append(column_name) value_types.append(infer_value_type(column_name, role=role)) + def add_parameters() -> None: + for parameter in config.get("parameters", []): + if isinstance(parameter, dict): + add(parameter.get("name") or parameter.get("id"), role="parameter") + else: + add(parameter, role="parameter") + + def add_outputs() -> None: + """Append the equipment's measured outputs so users have explicit + columns to report results (etch rate, uniformity, …) alongside the + input set-points. Covers both the optimization targets and the richer + ``outputs`` list captured at registration.""" + targets = config.get("targets", {}) + if isinstance(targets, dict): + for target in (targets.get("primary"), targets.get("secondary")): + if isinstance(target, dict): + target_id = target.get("id") or target.get("name") + if target_id and str(target_id) not in _PLACEHOLDER_TARGET_IDS: + add(target_id, role="target") + elif target: + add(target, role="target") + for output in config.get("outputs", []): + if isinstance(output, dict): + add(output.get("name") or output.get("id"), role="output") + else: + add(output, role="output") + + # When the equipment declares an explicit ingestion column list we honor it + # as the canonical input schema, but still ensure the lot/timestamp keys, + # registered input parameters, and outputs are present so no run metadata is + # silently dropped for configs that listed only a subset of columns. if explicit_columns: for column in explicit_columns: add(column["name"]) + add(lot_col, role="lot") + add(ts_col, role="timestamp") + add_parameters() + add_outputs() return columns, value_types add(lot_col, role="lot") @@ -389,13 +594,8 @@ def add(name: Any, *, role: str = "") -> None: else: add(feature, role="feature") - targets = config.get("targets", {}) - if isinstance(targets, dict): - for target in (targets.get("primary"), targets.get("secondary")): - if isinstance(target, dict): - add(target.get("id") or target.get("name"), role="target") - else: - add(target, role="target") + add_parameters() + add_outputs() if not columns: derived_columns = derive_expected_columns(config) @@ -405,6 +605,42 @@ def add(name: Any, *, role: str = "") -> None: return columns, value_types +def _template_hint_by_column(config: dict[str, Any]) -> dict[str, str]: + """Map each template column to its value-type/unit hint (the second row of + the generated CSV template).""" + try: + columns, value_types = _template_columns_and_value_types(config) + except Exception: + return {} + return {col: str(vt) for col, vt in zip(columns, value_types)} + + +def _is_template_hint_row( + raw_row: dict[str, Any], + hint_by_column: dict[str, str], +) -> bool: + """Detect the descriptor row a user may leave in place when filling out the + downloaded template (e.g. ``string,datetime,float,sccm``). + + Every hint-bearing column that is actually present in the uploaded row must + exactly equal its template hint (columns the user removed from the template + are ignored, so a trimmed inputs-only template is still recognized). This is + intentionally strict over the present columns so that a sparse, + partially-matching data row is never mistaken for the descriptor and silently + dropped. Callers further restrict this check to the first data row.""" + hints = { + col: str(hint).strip() + for col, hint in hint_by_column.items() + if str(hint).strip() and col in raw_row + } + if not hints: + return False + for column, hint in hints.items(): + if str(raw_row.get(column) or "").strip() != hint: + return False + return True + + @router.post("/upload") async def upload_file( equipment_id: str = Form(...), @@ -438,15 +674,14 @@ async def upload_file( status_code=403, detail="Not authorized to upload data for this equipment", ) - expected_columns = [ - column["name"] - for column in equipment.get("columns", []) - if isinstance(column, dict) and column.get("name") - ] - if not expected_columns: - expected_columns = derive_expected_columns( - equipment.get("config_json", {}) - ) + # Required schema for registered equipment is its lot/timestamp metadata + # plus declared inputs. Outputs/targets (incl. synthetic placeholders and + # any listed among explicit columns) stay optional so an inputs-only CSV + # is still ingestible. The effective config backfills column definitions + # that registration stored separately from config_json. + expected_columns = _required_upload_columns( + _effective_equipment_config(equipment) + ) else: legacy_config = load_domain_config(equipment_id) if legacy_config is None: @@ -459,29 +694,11 @@ async def upload_file( ) expected_columns = derive_expected_columns(legacy_config) - # Extract numerical boundary limits for validation - expected_bounds = {} - config_json = equipment.get("config_json", {}) if equipment else legacy_config - for f in config_json.get("features", []): - if isinstance(f, dict) and f.get("name"): - try: - min_v = float(f.get("min_value")) if f.get("min_value") else None - max_v = float(f.get("max_value")) if f.get("max_value") else None - if min_v is not None or max_v is not None: - expected_bounds[f["name"]] = (min_v, max_v) - except (ValueError, TypeError): - pass - - for t_key in ["primary_target", "secondary_target"]: - t = config_json.get(t_key) - if isinstance(t, dict) and t.get("name"): - try: - min_v = float(t.get("min_value")) if t.get("min_value") else None - max_v = float(t.get("max_value")) if t.get("max_value") else None - if min_v is not None or max_v is not None: - expected_bounds[t["name"]] = (min_v, max_v) - except (ValueError, TypeError): - pass + # Extract numerical boundary limits for validation. These come from the + # equipment's registered features/parameters/targets and are used to flag + # (non-blocking) out-of-range values that usually indicate a unit mismatch. + config_json = _effective_equipment_config(equipment) if equipment else legacy_config + expected_bounds = _extract_expected_bounds(config_json) # Check extension ext = Path(file.filename).suffix.lower() @@ -501,8 +718,14 @@ async def upload_file( if size_bytes > 50 * 1024 * 1024: # 50 MB limit raise HTTPException(status_code=400, detail="File exceeds 50 MB limit") - # Parse CSV to validate basic structure - errors = [] + # Parse CSV to validate basic structure. + # + # Hard errors (missing required columns, malformed rows) block processing. + # Out-of-range values are recorded as non-blocking warnings: they usually + # mean a unit mismatch worth flagging, but they should not stop a researcher + # from ingesting otherwise well-formed data. + errors: list[str] = [] + warnings: list[str] = [] row_count = 0 columns = [] @@ -513,7 +736,27 @@ async def upload_file( header = next(reader, None) if header: columns = [c.strip() for c in header] - if expected_columns: + if equipment_id == "etcher": + # The canonical etcher processor (_csv_upload_to_sync_rows) + # requires the full alias-aware sync schema, including the + # AvgEtchRate/RangeEtchRate outputs. Validate against the same + # contract so an upload accepted here can actually be + # processed. + header_set = set(columns) + missing_columns = [ + canonical + for canonical in REQUIRED_SYNC_COLUMNS + if not any( + alias in header_set + for alias in UPLOAD_TO_SYNC_ALIASES[canonical] + ) + ] + if missing_columns: + errors.append( + "Missing required columns: " + + ", ".join(sorted(missing_columns)) + ) + elif expected_columns: missing_columns = [ column for column in expected_columns if column not in columns ] @@ -530,22 +773,24 @@ async def upload_file( errors.append("... (more errors truncated)") break continue - + for col_idx, col_name in enumerate(columns): if col_name in expected_bounds: min_v, max_v = expected_bounds[col_name] if col_idx >= len(row): continue val_str = row[col_idx].strip() if not val_str: continue - try: - val = float(val_str) + val = _coerce_optional_float(val_str) + if val is None: + continue + if len(warnings) <= 10: if min_v is not None and val < min_v: - errors.append(f"Row {i} Validation Warning: '{col_name}' value {val} is below minimum ({min_v}). Check units.") + warnings.append(f"Row {i} Validation Warning: '{col_name}' value {val} is below minimum ({min_v}). Check units.") if max_v is not None and val > max_v: - errors.append(f"Row {i} Validation Warning: '{col_name}' value {val} exceeds maximum ({max_v}). Check units.") - except Exception: - pass - + warnings.append(f"Row {i} Validation Warning: '{col_name}' value {val} exceeds maximum ({max_v}). Check units.") + if len(warnings) > 10: + warnings.append("... (more warnings truncated)") + if len(errors) > 10 and not errors[-1].startswith("..."): errors.append("... (more errors truncated)") break @@ -570,13 +815,17 @@ async def upload_file( with open(dest, "wb") as f: f.write(contents) + # Only hard errors drive failure; warnings are surfaced but stored + # alongside so the upload still reaches a processable "success" state. status = "error" if errors else "success" + stored_issues = errors + warnings if ext == ".csv": - message = ( - f"File '{file.filename}' was stored after CSV header and row-shape validation." - if not errors - else f"File '{file.filename}' was stored, but CSV structural validation found {len(errors)} issue(s)." - ) + if errors: + message = f"File '{file.filename}' was stored, but CSV structural validation found {len(errors)} issue(s)." + elif warnings: + message = f"File '{file.filename}' was stored after validation with {len(warnings)} range warning(s) to review." + else: + message = f"File '{file.filename}' was stored after CSV header and row-shape validation." else: message = ( f"Excel file '{file.filename}' was stored. Detailed spreadsheet parsing and automatic FAIR/AI ingestion are not performed by this endpoint." @@ -590,7 +839,7 @@ async def upload_file( size_bytes=size_bytes, row_count=row_count, columns=columns, - errors=errors, + errors=stored_issues, status=status, user=user, ) @@ -613,7 +862,7 @@ async def upload_file( "size_bytes": size_bytes, "row_count": row_count, "columns": columns, - "errors": errors if errors else None, + "errors": stored_issues if stored_issues else None, "message": message, "processing_status": "stored_only", } @@ -627,6 +876,424 @@ def list_uploads( return list_uploads_pg(user) +def _effective_equipment_config(equipment: dict[str, Any]) -> dict[str, Any]: + """Return a registered equipment's ingestion config, backfilling the + explicit column definitions from the record's top-level ``columns`` + (columns_json) when ``config_json`` is empty or omits them. Registration can + persist the column schema separately from config_json, so the upload schema + must consult both to avoid accepting/processing files that drop registered + columns.""" + config = equipment.get("config_json") + config = dict(config) if isinstance(config, dict) else {} + if not config.get("columns"): + top_columns = equipment.get("columns") + if isinstance(top_columns, list) and top_columns: + config["columns"] = top_columns + return config + + +def _load_equipment_config(equipment_id: str) -> dict[str, Any]: + """Return the ingestion config for a registered equipment, falling back to a + legacy JSON domain config. Returns {} if neither is found.""" + equipment = get_equipment_pg(equipment_id) + if equipment: + return _effective_equipment_config(equipment) + legacy = load_domain_config(equipment_id) + return legacy if isinstance(legacy, dict) else {} + + +# Default target ids build_domain_config assigns when an equipment is +# registered without explicit optimization targets. They are placeholders, not +# real measured columns, so they must not be treated as required/expected data. +_PLACEHOLDER_TARGET_IDS = {"primary_metric", "secondary_metric"} + + +def _real_target_ids(config: dict[str, Any]) -> list[str]: + """Return optimization-target ids that correspond to actual measured + columns (excluding the synthetic placeholders).""" + ids: list[str] = [] + targets = config.get("targets", {}) + if isinstance(targets, dict): + for target in (targets.get("primary"), targets.get("secondary")): + if isinstance(target, dict): + name = target.get("id") or target.get("name") + if name and str(name) not in _PLACEHOLDER_TARGET_IDS: + ids.append(str(name)) + return ids + + +def _config_input_output_names( + config: dict[str, Any], +) -> tuple[list[str], list[str]]: + """Split an equipment config's columns into input set-points and measured + outputs. Outputs are authoritative (real targets + registered outputs); + every other declared column that isn't the lot/timestamp key is treated as + input. + """ + data_config = config.get("data", {}) + if not isinstance(data_config, dict): + data_config = {} + lot_col = data_config.get("lotname_column") or "LOTNAME" + ts_col = data_config.get("timestamp_column") or "run_date" + + output_names: list[str] = [] + seen_out: set[str] = set() + + def add_out(name: Any) -> None: + if not name: + return + text = str(name) + if text not in seen_out: + seen_out.add(text) + output_names.append(text) + + for target_id in _real_target_ids(config): + add_out(target_id) + for output in config.get("outputs", []): + if isinstance(output, dict): + add_out(output.get("name") or output.get("id")) + else: + add_out(output) + + input_names: list[str] = [] + seen_in: set[str] = set() + + def add_in(name: Any) -> None: + if not name: + return + text = str(name) + if text in seen_out or text in {lot_col, ts_col} or text in seen_in: + return + seen_in.add(text) + input_names.append(text) + + for feature in config.get("features", []): + if isinstance(feature, dict): + add_in(feature.get("id") or feature.get("name")) + else: + add_in(feature) + for parameter in config.get("parameters", []): + if isinstance(parameter, dict): + add_in(parameter.get("name") or parameter.get("id")) + else: + add_in(parameter) + for column in config.get("columns", []): + if isinstance(column, dict): + add_in(column.get("name")) + + return input_names, output_names + + +def _required_upload_columns(config: dict[str, Any]) -> list[str]: + """Columns a generic upload must contain to be ingestible. + + Always requires the lot and timestamp metadata columns (so catalog runs are + never anonymous/undated) plus the equipment's declared *inputs* (features + + parameters). Outputs/targets are intentionally optional — results may be + reported later — so they are explicitly excluded even when a legacy config + lists them among explicit ``columns``. Synthetic placeholder targets are + likewise never required. + """ + data_config = config.get("data", {}) + if not isinstance(data_config, dict): + data_config = {} + lot_col = data_config.get("lotname_column") or "LOTNAME" + ts_col = data_config.get("timestamp_column") or "run_date" + + input_names, output_names = _config_input_output_names(config) + output_set = set(output_names) + + names: list[str] = [] + seen: set[str] = set() + + def add(name: Any) -> None: + if not name: + return + text = str(name) + if text in output_set or text in seen: + return + seen.add(text) + names.append(text) + + add(lot_col) + add(ts_col) + for name in input_names: + add(name) + return names + + +def _config_type_by_name(config: dict[str, Any]) -> dict[str, str]: + """Map each declared column to its registered value type so ingestion can + preserve identifiers/integers/booleans instead of floating everything.""" + types: dict[str, str] = {} + + def register(name: Any, declared: Any) -> None: + if not name: + return + types.setdefault(str(name), str(declared or "").strip()) + + for column in config.get("columns", []): + if isinstance(column, dict): + register(column.get("name"), column.get("type")) + for parameter in config.get("parameters", []): + if isinstance(parameter, dict): + register(parameter.get("name") or parameter.get("id"), parameter.get("type")) + for output in config.get("outputs", []): + if isinstance(output, dict): + register(output.get("name") or output.get("id"), output.get("type")) + # Features are numeric ranges; targets are numeric metrics. + for feature in config.get("features", []): + if isinstance(feature, dict): + register(feature.get("id") or feature.get("name"), "float") + for target_id in _real_target_ids(config): + types.setdefault(target_id, "float") + return types + + +def _coerce_typed(value: Any, declared_type: str) -> Any: + """Coerce a CSV cell according to its registered type. Falls back to + numeric-where-possible only when the type is unknown, so declared strings + (e.g. zero-padded ids) and integers are preserved faithfully.""" + if value is None: + return None + text = str(value).strip() + if text == "": + return None + + t = (declared_type or "").strip().lower() + if t in {"string", "str", "text", "categorical", "category", "id", "identifier"}: + return text + if t in {"bool", "boolean"}: + low = text.lower() + if low in {"true", "1", "yes", "y", "t"}: + return True + if low in {"false", "0", "no", "n", "f"}: + return False + return text + if t in {"int", "integer", "long"}: + parsed = _coerce_optional_int(text) + return parsed if parsed is not None else text + if t in {"float", "double", "number", "numeric", "real", "decimal"}: + parsed = _coerce_optional_float(text) + return parsed if parsed is not None else text + + # Unknown/unit-based type: keep the legacy numeric-where-possible behavior. + return _coerce_cell(value) + + +def _declared_type_violation(value: Any, declared_type: str) -> bool: + """Return True when a non-empty cell cannot be represented as its declared + numeric type. String/categorical/unknown types accept anything; booleans and + numbers must parse, otherwise the value would be stored as an invalid typed + cell.""" + if value is None: + return False + text = str(value).strip() + if text == "": + return False + + t = (declared_type or "").strip().lower() + if t in {"int", "integer", "long"}: + # Integer columns must be whole numbers: reject unparseable and + # fractional values (e.g. 1.5). Parsed directly so large 64-bit values + # aren't rounded by a float round-trip. + return _coerce_optional_int(text) is None + if t in {"float", "double", "number", "numeric", "real", "decimal"}: + return _coerce_optional_float(text) is None + if t in {"bool", "boolean"}: + return text.lower() not in {"true", "1", "yes", "y", "t", "false", "0", "no", "n", "f"} + return False + + +def _coerce_cell(value: Any) -> Any: + """Numeric where possible, otherwise the trimmed string (or None).""" + if value is None: + return None + text = str(value).strip() + if text == "": + return None + parsed = _coerce_optional_float(text) + return parsed if parsed is not None else text + + +_TIMESTAMP_FORMATS = ( + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%d", + "%Y/%m/%d %H:%M:%S", + "%Y/%m/%d", + "%m/%d/%Y %H:%M:%S", + "%m/%d/%Y %H:%M", + "%m/%d/%Y", + "%d-%m-%Y", +) + + +def _normalize_timestamp(text: str) -> tuple[str | None, bool]: + """Parse a timestamp cell into a normalized ISO string. + + Returns ``(iso_string_or_None, is_valid)``. ``is_valid`` is False only when + a non-empty value cannot be parsed, so callers can raise an actionable + validation error instead of letting an invalid literal reach the + ``TIMESTAMP WITH TIME ZONE`` column and fail the whole sync with a 500. + """ + raw = (text or "").strip() + if not raw: + return None, True + candidate = raw.replace("Z", "+00:00") if raw.endswith("Z") else raw + try: + return datetime.fromisoformat(candidate).isoformat(), True + except ValueError: + pass + for fmt in _TIMESTAMP_FORMATS: + try: + return datetime.strptime(raw, fmt).isoformat(), True + except ValueError: + continue + return None, False + + +def _csv_upload_to_equipment_rows( + *, + config: dict[str, Any], + upload_record: dict[str, Any], + project_id: str, +) -> tuple[list[dict[str, Any]], list[str]]: + """Generic CSV → equipment_runs rows builder for any registered equipment. + + Splits each row into the equipment's declared inputs and outputs and keeps + the full original row as the raw payload. Enforces the equipment's required + columns (lot/timestamp/features/targets) before ingestion. + """ + path = Path(upload_record["storage_path"]) + if path.suffix.lower() != ".csv": + return [], ["Only CSV uploads can be processed into the data catalog today."] + if not path.exists(): + return [], ["Stored upload file is missing from disk."] + + data_config = config.get("data", {}) + if not isinstance(data_config, dict): + data_config = {} + lot_col = data_config.get("lotname_column") or "LOTNAME" + ts_col = data_config.get("timestamp_column") or "run_date" + + input_names, output_names = _config_input_output_names(config) + required_columns = _required_upload_columns(config) + type_by_name = _config_type_by_name(config) + hint_by_column = _template_hint_by_column(config) + lot_required = lot_col in required_columns + ts_required = ts_col in required_columns + + errors: list[str] = [] + rows: list[dict[str, Any]] = [] + with path.open("r", encoding="utf-8-sig", newline="") as f: + reader = csv.DictReader(f) + headers = set(reader.fieldnames or []) + missing = [column for column in required_columns if column not in headers] + if missing: + errors.append( + "Missing required columns: " + ", ".join(sorted(missing)) + ) + return [], errors + + first_data_row = True + for idx, raw_row in enumerate(reader, start=2): + # Skip the template's value-type descriptor row only when it is the + # very first data row, so a later row can't be dropped for resembling + # the hint. + if first_data_row: + first_data_row = False + if _is_template_hint_row(raw_row, hint_by_column): + continue + + # Reject structurally malformed rows even when the process endpoint is + # called directly (bypassing upload-time validation). DictReader stores + # surplus cells under the None key, and a short row leaves declared + # columns at the restval (None); either way the row's data would be + # silently mis-aligned, so it must not become a catalog run. + expected_width = len(reader.fieldnames or []) + if None in raw_row: + errors.append(f"Row {idx}: expected {expected_width} columns, got more") + if len(errors) > 50: + errors.append("... (more errors truncated)") + break + continue + if any(value is None for value in raw_row.values()): + errors.append(f"Row {idx}: expected {expected_width} columns, got fewer") + if len(errors) > 50: + errors.append("... (more errors truncated)") + break + continue + + lotname = str(raw_row.get(lot_col) or "").strip() + if lot_required and not lotname: + errors.append(f"Row {idx}: {lot_col} is required") + + ts_raw = str(raw_row.get(ts_col) or "").strip() + run_date, ts_ok = _normalize_timestamp(ts_raw) + if not ts_ok: + errors.append( + f"Row {idx}: {ts_col} '{ts_raw}' is not a recognized date/time" + ) + elif ts_required and not ts_raw: + errors.append(f"Row {idx}: {ts_col} is required") + + for name in (*input_names, *output_names): + if name not in raw_row: + continue + declared = type_by_name.get(name, "") + if _declared_type_violation(raw_row.get(name), declared): + errors.append( + f"Row {idx}: {name} '{str(raw_row.get(name)).strip()}' " + f"is not a valid {declared}" + ) + + if len(errors) > 50: + errors.append("... (more errors truncated)") + break + + # Blank cells coerce to None; omit them so an inputs-only upload (or a + # row that leaves optional outputs empty) doesn't store misleading + # {"result": null} entries that would surface as measured results in + # the catalog and FAIR metadata. + inputs = {} + for name in input_names: + if name not in raw_row: + continue + coerced = _coerce_typed(raw_row.get(name), type_by_name.get(name, "")) + if coerced is not None: + inputs[name] = coerced + outputs = {} + for name in output_names: + if name not in raw_row: + continue + coerced = _coerce_typed(raw_row.get(name), type_by_name.get(name, "")) + if coerced is not None: + outputs[name] = coerced + # Standard run-quality metadata (if present) drives catalog filters, + # badges, and outlier counts. Parse it into the top-level row (using + # the same column aliases as the etcher path) instead of letting it + # fall back to database defaults. + rows.append( + { + "project_id": project_id, + "lotname": lotname, + "run_date": run_date, + "inputs": inputs, + "outputs": outputs, + "raw": dict(raw_row), + "source": "data_upload", + "is_outlier": _coerce_bool_flag(_upload_value(raw_row, "is_outlier")), + "is_calibration_recipe": _coerce_bool_flag( + _upload_value(raw_row, "is_calibration_recipe") + ), + "outlier_type": str(_upload_value(raw_row, "outlier_type") or "").strip(), + } + ) + + return rows, errors + + @router.post("/uploads/{upload_id}/process") def process_upload( upload_id: str, @@ -634,69 +1301,105 @@ def process_upload( user: PlatformUser = Depends(get_platform_user), ): """ - Convert a stored CSV upload into canonical etcher_runs rows. + Convert a stored CSV upload into canonical run records. - This intentionally reuses sync_runs_pg(), the same ingestion sink used by - Azure, so upsert behavior, project linkage, raw payload storage, and - proposal reconciliation stay consistent across ingestion sources. + The canonical etcher routes rows through sync_runs_pg() — the same ingestion + sink used by Azure — so upsert behavior, project linkage, raw payload + storage, and proposal reconciliation stay consistent. Every other registered + equipment is ingested into the generic equipment_runs table so manually + uploaded data is captured and surfaced in the catalog regardless of type. """ project_id = payload.project_id.strip() if not project_id: raise HTTPException(status_code=400, detail="project_id is required") - _assert_project_visible(project_id, user) + project = _assert_project_visible(project_id, user) upload_record = _load_upload_record(upload_id, user) + equipment_id = upload_record["equipment_id"] + + # Non-blocking range warnings recorded at validation time should survive + # processing so the catalog/upload UI can keep surfacing them. A "success" + # upload's stored issues are warnings only (hard errors block the success + # state), so they are safe to carry forward. + prior_warnings = ( + list(upload_record.get("errors") or []) + if upload_record.get("status") == "success" + else [] + ) + + # Guard against linking a run to a project that belongs to a different + # tool. A project with no equipment association is left unconstrained. + project_equipment_id = str(project.get("equipment_id") or "").strip() + if project_equipment_id and project_equipment_id != equipment_id: + raise HTTPException( + status_code=400, + detail=( + f"Selected project is associated with equipment " + f"'{project_equipment_id}', not '{equipment_id}'. Choose a " + "matching project." + ), + ) + _update_upload_record(upload_id=upload_id, status="processing", errors=[]) - rows, errors = _csv_upload_to_sync_rows( - upload_id=upload_id, - upload_record=upload_record, - project_id=project_id, - ) - if errors: + def _fail(errors: list[str], message: str, row_count: int) -> dict[str, Any]: _update_upload_record( upload_id=upload_id, status="failed", errors=errors[:50], - row_count=len(rows), + row_count=row_count, ) return { "status": "failed", "upload_id": upload_id, - "equipment_id": upload_record["equipment_id"], + "equipment_id": equipment_id, "project_id": project_id, "inserted": 0, - "row_count": len(rows), + "row_count": row_count, "errors": errors[:50], - "message": "Upload was not processed because validation failed.", + "message": message, } - if not rows: - errors = ["No data rows were found in the CSV upload."] - _update_upload_record( + + is_canonical_etcher = equipment_id == "etcher" + + if is_canonical_etcher: + rows, errors = _csv_upload_to_sync_rows( upload_id=upload_id, - status="failed", - errors=errors, - row_count=0, + upload_record=upload_record, + project_id=project_id, + ) + else: + config = _load_equipment_config(equipment_id) + rows, errors = _csv_upload_to_equipment_rows( + config=config, + upload_record=upload_record, + project_id=project_id, + ) + + if errors: + return _fail(errors, "Upload was not processed because validation failed.", len(rows)) + if not rows: + return _fail( + ["No data rows were found in the CSV upload."], + "Upload was not processed because it contained no rows.", + 0, ) - return { - "status": "failed", - "upload_id": upload_id, - "equipment_id": upload_record["equipment_id"], - "project_id": project_id, - "inserted": 0, - "row_count": 0, - "errors": errors, - "message": "Upload was not processed because it contained no rows.", - } try: - inserted = sync_runs_pg(rows) + if is_canonical_etcher: + inserted = sync_runs_pg(rows) + else: + inserted = sync_equipment_runs_pg( + equipment_id=equipment_id, + rows=rows, + upload_id=upload_id, + upload_filename=upload_record["filename"], + ) except Exception as exc: - errors = [f"Database sync failed: {exc}"] _update_upload_record( upload_id=upload_id, status="failed", - errors=errors, + errors=[f"Database sync failed: {exc}"], row_count=len(rows), ) raise HTTPException(status_code=500, detail="Upload processing sync failed") from exc @@ -704,18 +1407,18 @@ def process_upload( _update_upload_record( upload_id=upload_id, status="processed", - errors=[], + errors=prior_warnings[:50], row_count=len(rows), ) return { "status": "processed", "upload_id": upload_id, - "equipment_id": upload_record["equipment_id"], + "equipment_id": equipment_id, "project_id": project_id, "inserted": inserted, "row_count": len(rows), - "errors": [], - "message": f"Processed {inserted} row(s) into etcher_runs.", + "errors": prior_warnings[:50], + "message": f"Processed {inserted} row(s) into the data catalog.", } diff --git a/api/scripts/add_equipment_runs.sql b/api/scripts/add_equipment_runs.sql new file mode 100644 index 0000000..b39e5ef --- /dev/null +++ b/api/scripts/add_equipment_runs.sql @@ -0,0 +1,68 @@ +-- Generic per-equipment runs table for manual uploads of ANY registered +-- equipment (not just the canonical etcher). Inputs/outputs are stored as +-- JSONB so no bespoke table is required per equipment type. +-- Safe to run multiple times. + +CREATE TABLE IF NOT EXISTS equipment_runs ( + id BIGSERIAL PRIMARY KEY, + equipment_id VARCHAR(255) NOT NULL, + project_id VARCHAR(50) REFERENCES projects(id) ON DELETE SET NULL, + lotname VARCHAR(255), + run_date TIMESTAMP WITH TIME ZONE, + is_outlier BOOLEAN DEFAULT false, + is_calibration_recipe BOOLEAN DEFAULT false, + outlier_type TEXT DEFAULT '', + inputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + outputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + raw_payload_json JSONB NOT NULL DEFAULT '{}'::jsonb, + upload_id UUID, + row_index INT, + upload_filename VARCHAR(255) DEFAULT '', + source VARCHAR(100) DEFAULT 'data_upload', + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +-- Backfill for tables created before row_index existed. +ALTER TABLE equipment_runs ADD COLUMN IF NOT EXISTS row_index INT; + +CREATE INDEX IF NOT EXISTS idx_equipment_runs_equipment_id ON equipment_runs(equipment_id); +CREATE INDEX IF NOT EXISTS idx_equipment_runs_project_id ON equipment_runs(project_id); +CREATE INDEX IF NOT EXISTS idx_equipment_runs_upload_id ON equipment_runs(upload_id); +CREATE UNIQUE INDEX IF NOT EXISTS uq_equipment_runs_upload_row + ON equipment_runs(upload_id, row_index) + WHERE upload_id IS NOT NULL; + +ALTER TABLE equipment_runs ENABLE ROW LEVEL SECURITY; + +GRANT SELECT ON equipment_runs TO api_client; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM pg_policies + WHERE schemaname = 'public' + AND tablename = 'equipment_runs' + AND policyname = 'equipment_run_visibility_policy' + ) THEN + CREATE POLICY equipment_run_visibility_policy ON equipment_runs + FOR SELECT + USING ( + EXISTS ( + SELECT 1 + FROM projects p + WHERE p.id = equipment_runs.project_id + AND ( + p.access_mode = 'open' + OR EXISTS ( + SELECT 1 + FROM project_members pm + WHERE pm.project_id = p.id + AND pm.nanohub_user_id = current_setting('app.current_user', true) + ) + ) + ) + ); + END IF; +END +$$; diff --git a/geddes/k8s/postgres/schema_v2.sql b/geddes/k8s/postgres/schema_v2.sql index 56c95d8..3763d02 100644 --- a/geddes/k8s/postgres/schema_v2.sql +++ b/geddes/k8s/postgres/schema_v2.sql @@ -90,6 +90,60 @@ CREATE POLICY run_visibility_policy ON etcher_runs ) ); +-- Generic per-equipment runs table. Unlike etcher_runs (which has typed +-- columns for the canonical etcher schema), this table stores the validated +-- inputs/outputs for ANY registered equipment as JSONB, so manual uploads for +-- newly registered equipment can be ingested without a bespoke table. +CREATE TABLE IF NOT EXISTS equipment_runs ( + id BIGSERIAL PRIMARY KEY, + equipment_id VARCHAR(255) NOT NULL, + project_id VARCHAR(50) REFERENCES projects(id) ON DELETE SET NULL, + lotname VARCHAR(255), + run_date TIMESTAMP WITH TIME ZONE, + is_outlier BOOLEAN DEFAULT false, + is_calibration_recipe BOOLEAN DEFAULT false, + outlier_type TEXT DEFAULT '', + inputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + outputs_json JSONB NOT NULL DEFAULT '{}'::jsonb, + raw_payload_json JSONB NOT NULL DEFAULT '{}'::jsonb, + upload_id UUID, + row_index INT, + upload_filename VARCHAR(255) DEFAULT '', + source VARCHAR(100) DEFAULT 'data_upload', + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_equipment_runs_equipment_id ON equipment_runs(equipment_id); +CREATE INDEX IF NOT EXISTS idx_equipment_runs_project_id ON equipment_runs(project_id); +CREATE INDEX IF NOT EXISTS idx_equipment_runs_upload_id ON equipment_runs(upload_id); +-- Stable per-upload row identity so reprocessing an upload upserts in place +-- (preserving each run's id / exposed run_id) instead of reallocating ids. +CREATE UNIQUE INDEX IF NOT EXISTS uq_equipment_runs_upload_row + ON equipment_runs(upload_id, row_index) + WHERE upload_id IS NOT NULL; + +ALTER TABLE equipment_runs ENABLE ROW LEVEL SECURITY; +GRANT SELECT ON equipment_runs TO api_client; + +CREATE POLICY equipment_run_visibility_policy ON equipment_runs + FOR SELECT + USING ( + EXISTS ( + SELECT 1 + FROM projects p + WHERE p.id = equipment_runs.project_id + AND ( + p.access_mode = 'open' + OR EXISTS ( + SELECT 1 + FROM project_members pm + WHERE pm.project_id = p.id + AND pm.nanohub_user_id = current_setting('app.current_user', true) + ) + ) + ) + ); + CREATE TABLE IF NOT EXISTS users ( id VARCHAR(255) PRIMARY KEY, -- NanoHUB Subject ID or Email name VARCHAR(255) NOT NULL, diff --git a/web/app/data/catalog/[id]/page.tsx b/web/app/data/catalog/[id]/page.tsx index 273b6a1..95bdb31 100644 --- a/web/app/data/catalog/[id]/page.tsx +++ b/web/app/data/catalog/[id]/page.tsx @@ -116,6 +116,12 @@ export default function RunDetailPage({ params }: { params: Promise<{ id: string const access = ACCESS_STYLE[accessKey] || ACCESS_STYLE.private; const AccessIcon = access.icon; + // Generic (non-etcher) equipment report their measured results in `outputs` + // rather than the canonical etch-rate columns. + const genericOutputs = + run.outputs && Object.keys(run.outputs).length > 0 ? run.outputs : null; + const isEtcher = run.equipment_id === "etcher" || run.avg_etch_rate !== null; + return (
Measured outcomes -
-
-
- -

Avg etch rate

-
-

- {formatNumber(run.avg_etch_rate, 1)} - nm/min -

+ {genericOutputs ? ( +
+ {Object.entries(genericOutputs).map(([key, value]) => ( +
+
+ +

{key}

+
+

+ {typeof value === "number" + ? formatNumber(value, 2) + : value === null || value === undefined + ? "—" + : String(value)} +

+
+ ))}
-
-
- -

Range etch rate

+ ) : isEtcher ? ( +
+
+
+ +

Avg etch rate

+
+

+ {formatNumber(run.avg_etch_rate, 1)} + nm/min +

-

- ±{formatNumber(run.range_etch_rate, 1)} - nm/min -

-
-
-
- -

Range (nm)

+
+
+ +

Range etch rate

+
+

+ ±{formatNumber(run.range_etch_rate, 1)} + nm/min +

+
+
+
+ +

Range (nm)

+
+

+ {formatNumber(run.range_nm, 1)} + nm +

-

- {formatNumber(run.range_nm, 1)} - nm -

-
+ ) : ( +

+ No measured outputs were reported for this run. +

+ )}
{/* Process inputs */}

- Process inputs (etch-step averages) + {isEtcher ? "Process inputs (etch-step averages)" : "Process inputs"}

- Per-step averages computed from the Glance time series during the etch - step of this lot. + {isEtcher + ? "Per-step averages computed from the Glance time series during the etch step of this lot." + : "Recorded input set-points for this run."}

@@ -278,13 +309,13 @@ export default function RunDetailPage({ params }: { params: Promise<{ id: string - {Object.entries(run.features).map(([key, value]) => { + {Object.entries(run.features as Record).map(([key, value]) => { const meta = FEATURE_DISPLAY[key] || { label: key, unit: "", description: "" }; return ( - + ); diff --git a/web/app/data/catalog/page.tsx b/web/app/data/catalog/page.tsx index a471839..5e077c5 100644 --- a/web/app/data/catalog/page.tsx +++ b/web/app/data/catalog/page.tsx @@ -62,7 +62,7 @@ export default function DataCatalogPage() { const url = URL.createObjectURL(blob); const a = document.createElement("a"); a.href = url; - a.download = "digital_twin_runs.csv"; + a.download = "etcher_ml_export.csv"; document.body.appendChild(a); a.click(); a.remove(); @@ -104,6 +104,7 @@ export default function DataCatalogPage() { type="button" onClick={handleDownload} disabled={downloading} + title="Exports the etcher ML-ready dataset matrix. Runs uploaded for other equipment are browsable here but are not part of this etcher training export." className="inline-flex items-center justify-center gap-2 whitespace-nowrap text-sm font-medium transition-opacity focus-visible:outline-none focus-visible:ring-1 focus-visible:ring-[hsl(var(--ring))] h-9 px-4 py-2 bg-[hsl(var(--primary))] text-[hsl(var(--primary-foreground))] rounded-md shadow hover:opacity-90 disabled:opacity-60 disabled:cursor-not-allowed" > {downloading ? ( @@ -111,7 +112,7 @@ export default function DataCatalogPage() { ) : ( )} - {downloading ? "Preparing CSV..." : "Download CSV"} + {downloading ? "Preparing CSV..." : "Download Etcher ML CSV"} {downloadError && (

diff --git a/web/app/data/upload/page.tsx b/web/app/data/upload/page.tsx index 571e582..c29325c 100644 --- a/web/app/data/upload/page.tsx +++ b/web/app/data/upload/page.tsx @@ -59,7 +59,7 @@ function mapPersistedUpload(upload: PersistedUpload): UploadedFile { errors: upload.errors.length > 0 ? upload.errors : undefined, message: upload.status === "processed" - ? "Processed into etcher_runs." + ? "Processed into the data catalog." : upload.status === "success" ? "Stored upload record. Choose a project and process it to add rows to the catalog." : "Stored upload record with structural validation issues.", @@ -398,7 +398,7 @@ export default function DataUploadPage() {

)} {file.status === "processing" && ( -

Processing rows into etcher_runs...

+

Processing rows into the data catalog...

)} {(file.status === "error" || file.status === "failed") && file.errors && (
@@ -409,6 +409,15 @@ export default function DataUploadPage() { ))}
)} + {(file.status === "success" || file.status === "processed") && file.errors && ( +
+ {file.errors.map((warning, j) => ( +

+ • {warning} +

+ ))} +
+ )} {file.id && file.status === "success" && ( @@ -450,13 +459,13 @@ export default function DataUploadPage() {

2. Validation

- This screen structurally validates CSV headers, row width, and strictly enforces physical boundaries (min/max values) defined during registration. + This screen structurally validates CSV headers and row width, and flags out-of-range values (min/max defined during registration) as warnings to review.

3. Processing

- CSV uploads can be processed into etcher_runs for the selected project. Excel files are stored for provenance but are not ingested automatically. + CSV uploads can be processed into the data catalog for the selected project. Excel files are stored for provenance but are not ingested automatically.

diff --git a/web/lib/api-client.ts b/web/lib/api-client.ts index 5c4ab99..e17fedb 100644 --- a/web/lib/api-client.ts +++ b/web/lib/api-client.ts @@ -161,6 +161,7 @@ export interface V2Run { range_etch_rate: number | null; range_nm: number | null; features: Record; + outputs?: Record; project_id: string | null; project_name: string | null; equipment_id: string | null;
{meta.label} {meta.description}{formatNumber(value, 2)}{typeof value === "number" ? formatNumber(value, 2) : value === null || value === undefined ? "—" : String(value)} {meta.unit}