From a152445a097e34c8e08d74fff8fd15198c0eed0e Mon Sep 17 00:00:00 2001 From: Dawith Date: Fri, 6 Mar 2026 12:26:08 -0500 Subject: [PATCH 01/11] Incorrect interpolation for baseline stretching was corrected --- pipe/extract.py | 196 +++++++++++++++++++++++++++++++----------------- 1 file changed, 127 insertions(+), 69 deletions(-) diff --git a/pipe/extract.py b/pipe/extract.py index 7f0c4d3..4631105 100644 --- a/pipe/extract.py +++ b/pipe/extract.py @@ -16,9 +16,11 @@ from numpy import ndarray from pyspark.ml.linalg import Vectors from pyspark.sql import SparkSession, Row, DataFrame +from pyspark.sql.types import ArrayType, FloatType import scipy as sp -from pipe.enumsets import FileType, DataKind +from pipe.enumsets import Data, FileType, Thresholds +from pipe.parse_utils import get_field, strip_array def extract(spark: SparkSession) -> DataFrame: """ @@ -30,15 +32,12 @@ def extract(spark: SparkSession) -> DataFrame: """ path = Path("/app/workdir") - labels = [DataKind.BB, DataKind.FPS, DataKind.NSD, DataKind.NCNT, - DataKind.SPEC, DataKind.TARGET, DataKind.TREATMENT] - with open(path / "train.csv", "r") as file: - for line in file: - labels.append(line.strip().split(",")[0]) reader = FileReader(spark, filetype=FileType.MAT) - rdd = spark.sparkContext.parallelize(reader.read_file(path, labels), - numSlices=200) + rdd = spark.sparkContext.parallelize( + reader.read_file(path), + numSlices=200 + ) #return reader.read_file(path, labels) return spark.createDataFrame(rdd) @@ -62,15 +61,6 @@ def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str, return images -def strip_array(arr: ndarray) -> ndarray: - dtype = arr.dtype - while dtype == "object": - arr = arr[0] - dtype = arr.dtype - if len(arr) == 0: - return ["unknown"] - return arr - class FileReader: """ Class to read spectrograms and metadata from different file formats based @@ -116,8 +106,77 @@ def metadata_read(self, metapath: Path, labels:list, return metadata + def parse_matfile(self, matdata: dict): + """ + Parses the data from a .mat file and returns it as a dictionary. + + Args: + matdata (dict): Dictionary containing the data from the .mat file. + + Returns: + data_dict (dict): Dictionary containing the parsed data. + """ + + # Build the data dictionary + data_dict = {} + spec_dims = matdata[Data.TimeDependent.SPEC.value].shape + + if len(spec_dims) == 3: + data_dict["timeseries"] = np.column_stack( + tuple(matdata[td.value][0] for td in Data.TimeDependent) + ) + elif len(spec_dims) == 4: + dtuple = list(matdata[td.value][0] for td in Data.TimeDependent) + dtuple[0] = dtuple[0][0] + data_dict["timeseries"] = np.column_stack(dtuple) + else: + print("Unexpected dimensions for spectrogram:", + f"{len(spec_dims)}. Skipping file.") + return {} + + data_dict["timeseries"][:, -3:-1] = np.log10( + data_dict["timeseries"][:,-3:-1] + 1E-6 + ) + + header = matdata["header"] + for td in Data.TimeIndependent.Categorical: + data_dict[td.name] = str(strip_array(header[td.value])).lower() + + defaults = [0, 0, 0, data_dict["timeseries"].shape[0], 0] + data_dict["constants"] = np.array([ + strip_array(get_field(header, td.value, default)) + for td, default + in zip(Data.TimeIndependent.Continuous, defaults) + ]) + + # Perform quality checks + bb_max = np.max(data_dict["timeseries"][:,-3]) + ncnt_max = np.max(data_dict["timeseries"][:,-2]) + nsd_max = np.max(data_dict["timeseries"][:,-1]) + if np.isnan(data_dict["timeseries"]).any(): + print("Skipping file due to NaN values in timeseries.") + return {} + if bb_max < Thresholds.BB.value: + print(f"Skipping file due to low BB: {bb_max}.") + return {} + if ncnt_max < Thresholds.NCNT.value: + print(f"Skipping file due to low NCNT: {ncnt_max}.") + return {} + if nsd_max < Thresholds.NSD.value: + print(f"Skipping file due to low NSD: {nsd_max}.") + return {} + if data_dict["constants"].shape != (5,): + print("Unexpected shape for constants:", + f"{data_dict['constants'].shape}.") + return {} + if np.isnan(data_dict["constants"]).any(): + print("Skipping file due to NaN values in constants.") + return {} + + return data_dict + + def read_matfiles(self, specpath: Path, - datakinds: List[DataKind], default_size: tuple = (36, 130), pad_value: float = 0.) -> DataFrame: """ @@ -129,12 +188,11 @@ def read_matfiles(self, specpath: Path, specpath (Path): Path to the spectrogram files. default_size (tuple): Default size for the spectrograms. pad_value (float): Value to use for padding. - datakinds (List[DataKind]): List of data kinds to extract. + datakinds (List[Data]): List of data kinds to extract. Returns: DataFrame: Spark DataFrame containing the requested data. """ - #data = [] labels = glob.glob(str(specpath/"matfiles"/"*.mat")) nloops = default_size[0] nfreq = default_size[1] @@ -149,57 +207,57 @@ def read_matfiles(self, specpath: Path, ncnt_scale = 5. for label in labels: - row = {} - print(label) matdata = sp.io.loadmat(specpath/"matfiles"/label) - ncnt = np.log10(matdata["NCNT"][0]) - if np.min(ncnt) < 2: - print(f"Skipping file {label} due to low counts.") - print(np.min(ncnt)) + row = self.parse_matfile(matdata) + if row == {}: continue - else: - header = matdata["header"] - constants_array = np.zeros((1,5), dtype="float16") - constants_array[0,0] = header["f1"][0][0][0] - constants_array[0,1] = header["f2"][0][0][0] - constants_array[0,2] = header["t0"][0][0][0] - constants_array[0,4] = header["dt"][0][0][0] - timeseries_array = np.zeros((nloops, nfreq+3), dtype="float16") - spec = matdata["SP"] - for _ in range(len(spec.shape)-2): - spec = spec[0] - spec[np.abs(spec) == np.inf] = pad_value - spec[np.isnan(spec)] = pad_value - try: - constants_array[0,3] = header["tf"][0][0][0] - except ValueError: - constants_array[0,3] = float(spec.shape[-2]) - mean += np.mean(spec) - maxv += max(np.max(spec), maxv) - minv += min(np.min(spec), minv) - time_offset = int(nloops-constants_array[0,3]) - timeseries_array[time_offset:, :130] = \ - (spec - spec_meanshift) / spec_scale - timeseries_array[:time_offset, :130] = \ - timeseries_array[time_offset, :130] - timeseries_array[time_offset:, 130] = \ - (np.log10(matdata["BB"][0]) - bb_meanshift) / bb_scale - timeseries_array[time_offset:, 131] = \ - (matdata["NSD"][0] - nsd_meanshift) - timeseries_array[time_offset:, 132] = \ - np.log10(matdata["NCNT"][0]) / ncnt_scale - timeseries_array = Vectors.dense(timeseries_array.flatten()) - row["timeseries"] = timeseries_array - - if DataKind.TREATMENT in datakinds: - row["treatment"] = strip_array( - matdata["header"]["drug"])[0].lower() - if DataKind.TARGET in datakinds: - row["target"] = strip_array( - matdata["header"]["cell"])[0].lower() - row["target"] = "unknown" - #data.append(Row(**row)) - yield row + timeseries_array = np.zeros((nloops, nfreq+3), dtype="float16") + baseline = int(row["constants"][2]) + endpoint = int(row["constants"][3]) + diff = endpoint - baseline + if (diff < 1) or (baseline == 0): + diff = endpoint + timeseries_array[-diff:,:] = row["timeseries"][-diff:,:] + baseline_stretched = sp.interpolate.interp1d( + np.arange(baseline), + row["timeseries"][:baseline,:], + axis=0, + kind="linear" + ) + timeseries_array[:nloops-endpoint,:] = baseline_stretched( + np.arange(0,baseline,len(timeseries_array)-endpoint) + ) + row["timeseries"] = timeseries_array + del timeseries_array + + mean = np.mean(row["timeseries"][:,:-3]) + maxv = max(row["timeseries"][:,:-3].max(), maxv) + minv = min(row["timeseries"][:,:-3].min(), minv) + row["timeseries"][:,:-3] = ( + row["timeseries"][:,:-3] - spec_meanshift + ) / spec_scale + + row["timeseries"][:, -3] = ( + row["timeseries"][:, -3] - bb_meanshift + ) / bb_scale + + row["timeseries"][:, -2] = ( + row["timeseries"][:, -2] + ) / ncnt_scale + + row["timeseries"][:, -1] = row["timeseries"][:, -1] - nsd_meanshift + + timeseries_array = Vectors.dense(row["timeseries"].flatten()) + row["timeseries"] = timeseries_array + + # Rescale the constants + row["constants"][0] = np.log10(row["constants"][0]) / 5. + row["constants"][1] = np.log10(row["constants"][1]) / 5. + row["constants"][2] = np.log10(row["constants"][2]) / nloops - 0.5 + row["constants"][3] = np.log10(row["constants"][3]) / nloops - 0.5 + row["constants"][4] = (np.log10(row["constants"][4]) / 60.) - 0.5 + row["constants"] = Vectors.dense(row["constants"]) + yield row #return self.spark.createDataFrame(data) From d8702561eb5ed9488e0ed154160714ea17fa0bd0 Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:15:18 -0500 Subject: [PATCH 02/11] Beta slightly increased --- model/autoencoder.py | 77 ----------------------- model/autoencoder_smol.py | 69 --------------------- model/dnn.py | 20 ------ model/model.py | 47 ++++++-------- model/trainingwheel.py | 44 -------------- train/decoder_train.py | 101 ------------------------------ train/encoder_train.py | 125 -------------------------------------- 7 files changed, 20 insertions(+), 463 deletions(-) delete mode 100644 model/autoencoder.py delete mode 100644 model/autoencoder_smol.py delete mode 100644 model/dnn.py delete mode 100644 model/trainingwheel.py delete mode 100644 train/decoder_train.py delete mode 100644 train/encoder_train.py diff --git a/model/autoencoder.py b/model/autoencoder.py deleted file mode 100644 index 7bea8ac..0000000 --- a/model/autoencoder.py +++ /dev/null @@ -1,77 +0,0 @@ -import keras - -class Encoder(keras.Model): - def __init__(self, input_size=(26, 130, 1), latent_dim=128): - super(Encoder, self).__init__() - self.encoder = keras.Sequential([ - keras.layers.InputLayer(shape=input_size), - keras.layers.Conv2D(8, (3, 3), activation="relu"), - keras.layers.MaxPooling2D((2, 2)), - keras.layers.Conv2D(16, (3, 3), activation="relu"), - #keras.layers.MaxPooling2D((2, 2)), - keras.layers.Conv2D(32, (3, 3), activation="relu"), - keras.layers.Flatten(), - keras.layers.Dense(latent_dim*64, activation="relu"), - keras.layers.Dense(latent_dim*64, activation="relu"), - keras.layers.Dense(latent_dim*64, activation="relu"), - keras.layers.Dense(latent_dim*32, activation="relu"), - keras.layers.Dense(latent_dim*32, activation="relu"), - keras.layers.Dense(latent_dim*32, activation="relu"), - keras.layers.Dense(latent_dim*16, activation="relu"), - keras.layers.Dense(latent_dim*8, activation="relu"), - keras.layers.Dense(latent_dim*4, activation="relu"), - keras.layers.Dense(latent_dim*2, activation="relu"), - keras.layers.Dense(latent_dim, activation="relu") - ]) - - def call(self, x): - return self.encoder(x) - - def summary(self): - self.encoder.summary() - -class Decoder(keras.Model): - def __init__(self, latent_dim=128): - super(Decoder, self).__init__() - self.decoder = keras.Sequential([ - keras.layers.InputLayer(shape=(latent_dim,)), - keras.layers.Dense(latent_dim*2, activation="relu"), - keras.layers.Dense(latent_dim*4, activation="relu"), - keras.layers.Dense(latent_dim*8, activation="relu"), - keras.layers.Dense(latent_dim*16, activation="relu"), - keras.layers.Dense(latent_dim*32, activation="relu"), - keras.layers.Dense(latent_dim*32, activation="relu"), - keras.layers.Dense(latent_dim*32, activation="relu"), - keras.layers.Dense(latent_dim*64, activation="relu"), - keras.layers.Dense(latent_dim*64, activation="relu"), - keras.layers.Dense(latent_dim*64, activation="relu"), - keras.layers.Dense(15360, activation="relu"), - keras.layers.Reshape((8, 60, 32)), - keras.layers.Conv2DTranspose(32, (3, 3), activation="relu"), - #keras.layers.UpSampling2D((2, 2)), - keras.layers.Conv2DTranspose(16, (3, 3), activation="relu"), - keras.layers.UpSampling2D((2, 2)), - keras.layers.Conv2DTranspose(1, (3, 3), activation="sigmoid") - ]) - - def call(self, x): - return self.decoder(x) - - def summary(self): - self.decoder.summary() - -class Autoencoder(keras.Model): - def __init__(self, input_size=(26, 130, 1), latent_dim=128, **kwargs): - super(Autoencoder, self).__init__() - self.encoder = Encoder(input_size=input_size, latent_dim=latent_dim) - self.decoder = Decoder(latent_dim=latent_dim) - - def call(self, x): - encoded = self.encoder(x) - decoded = self.decoder(encoded) - return decoded - - def summary(self): - super().summary() - self.encoder.summary() - self.decoder.summary() diff --git a/model/autoencoder_smol.py b/model/autoencoder_smol.py deleted file mode 100644 index be24604..0000000 --- a/model/autoencoder_smol.py +++ /dev/null @@ -1,69 +0,0 @@ -import keras - -class Encoder(keras.Model): - def __init__(self, input_size=(26, 130, 1), latent_dim=128): - super(Encoder, self).__init__() - self.encoder = keras.Sequential([ - keras.layers.InputLayer(shape=input_size), - keras.layers.Conv2D(8, (3, 3), activation="relu"), - keras.layers.MaxPooling2D((2, 2)), - keras.layers.Conv2D(16, (3, 3), activation="relu"), - #keras.layers.MaxPooling2D((2, 2)), - keras.layers.Conv2D(32, (3, 3), activation="relu"), - keras.layers.Flatten(), - keras.layers.Dense(latent_dim*64, activation="relu"), - keras.layers.Dense(latent_dim*32, activation="relu"), - keras.layers.Dense(latent_dim*16, activation="relu"), - keras.layers.Dense(latent_dim*8, activation="relu"), - keras.layers.Dense(latent_dim*4, activation="relu"), - keras.layers.Dense(latent_dim*2, activation="relu"), - keras.layers.Dense(latent_dim, activation="relu") - ]) - - def call(self, x): - return self.encoder(x) - - def summary(self): - self.encoder.summary() - -class Decoder(keras.Model): - def __init__(self, latent_dim=128): - super(Decoder, self).__init__() - self.decoder = keras.Sequential([ - keras.layers.InputLayer(shape=(latent_dim,)), - keras.layers.Dense(latent_dim*2, activation="relu"), - keras.layers.Dense(latent_dim*4, activation="relu"), - keras.layers.Dense(latent_dim*8, activation="relu"), - keras.layers.Dense(latent_dim*16, activation="relu"), - keras.layers.Dense(latent_dim*32, activation="relu"), - keras.layers.Dense(latent_dim*64, activation="relu"), - keras.layers.Dense(15360, activation="relu"), - keras.layers.Reshape((8, 60, 32)), - keras.layers.Conv2DTranspose(32, (3, 3), activation="relu"), - #keras.layers.UpSampling2D((2, 2)), - keras.layers.Conv2DTranspose(16, (3, 3), activation="relu"), - keras.layers.UpSampling2D((2, 2)), - keras.layers.Conv2DTranspose(1, (3, 3), activation="sigmoid") - ]) - - def call(self, x): - return self.decoder(x) - - def summary(self): - self.decoder.summary() - -class Autoencoder(keras.Model): - def __init__(self, input_size=(26, 130, 1), latent_dim=128, **kwargs): - super(Autoencoder, self).__init__() - self.encoder = Encoder(input_size=input_size, latent_dim=latent_dim) - self.decoder = Decoder(latent_dim=latent_dim) - - def call(self, x): - encoded = self.encoder(x) - decoded = self.decoder(encoded) - return decoded - - def summary(self): - super().summary() - self.encoder.summary() - self.decoder.summary() diff --git a/model/dnn.py b/model/dnn.py deleted file mode 100644 index ec0988e..0000000 --- a/model/dnn.py +++ /dev/null @@ -1,20 +0,0 @@ -#-- coding: utf-8 -*- - -# This code defines a simple Deep Neural Network (DNN) model using Keras. - -import keras - -class DNN(keras.Model): - def __init__(self, input_shape, layer_stack_size, num_classes): - super(DNN, self).__init__() - self.input_layer = keras.layers.InputLayer(input_shape=input_shape) - for i in range(layer_stack_size): - setattr(self, f"hidden_layer{i+1}", - keras.layers.Dense(input_shape, activation="relu")) - self.output_layer = keras.layers.Dense(num_classes, activation="softmax") - - def call(self, inputs): - x = self.input_layer(inputs) - x = self.hidden_layer1(x) - x = self.hidden_layer2(x) - return self.output_layer(x) diff --git a/model/model.py b/model/model.py index d1d7e42..2080402 100644 --- a/model/model.py +++ b/model/model.py @@ -50,7 +50,7 @@ def __init__(self, input_shape, head_size, num_heads, ff_dim, self.cycle_len = 25 self.total_epoch = None self.max_cap = 5.0 - self.beta = 0.01 + self.beta = 0.05 self.encoder = Encoder( input_shape, @@ -146,7 +146,7 @@ def kl_capacity(self, epoch, max_cap, total_epochs): cap = max_cap * epoch / total_epochs return ops.minimum(max_cap, cap) - def kl_weight(self, epoch, step, total_epochs, cycle, max_cap): + def kl_weight(self, epoch, total_epochs, cycle, max_cap): beta = ( self.beta_anneal(epoch, total_epochs) * self.beta_cyclical(epoch, cycle) @@ -156,36 +156,29 @@ def kl_weight(self, epoch, step, total_epochs, cycle, max_cap): """ def train_step(self, data, sample_weight=None): - - with ops.GradientTape() as tape: - outputs, sem_loss, kl_loss, recon_loss = self( - data, training=True - ) - self.recon_weight = ops.cast(1./4., dtype=ops.float32) - epoch = ops.cast( - self.optimizer.iterations // self.steps_per_epoch, - kl_loss.dtype - ) - step = ops.cast(self.optimizer.iterations, kl_loss.dtype) - beta, capacity = self.kl_weight( - epoch=epoch, - step=step, - total_epochs=self.total_epoch, - cycle=self.cycle_len, - max_cap=self.max_cap - ) - kl_term = beta * ops.abs(kl_loss - capacity) - total_loss = sem_loss + kl_term + recon_loss * self.recon_weight + outputs, sem_loss, kl_loss, recon_loss = self(data) + beta, capacity = self.kl_weight( + epoch=self.current_epoch, + total_epochs=self.total_epoch, + cycle=self.cycle_len, + max_cap=self.max_cap + ) + + kl_weighted = beta * ops.abs(kl_loss - capacity) + total_loss = ops.mean( + sem_loss + kl_weighted + recon_loss * self.recon_weight + ) trainable_vars = self.trainable_variables - grads = tape.gradient(total_loss, trainable_vars) - self.optimizer.apply_gradients(zip(grads, trainable_vars)) + grads = ops.gradient(total_loss, trainable_vars) + self.optimizer.apply(grads, trainable_vars) + self.compiled_metrics.update_state(data[0], outputs) return { "loss": total_loss, - "sem_loss": sem_loss, - "kl_loss": kl_loss, - "recon_loss": recon_loss, + "sem_loss": ops.mean(sem_loss), + "kl_loss": ops.mean(kl_loss), + "recon_loss": ops.mean(recon_loss), "beta": beta, "capacity": capacity }""" diff --git a/model/trainingwheel.py b/model/trainingwheel.py deleted file mode 100644 index 2a612db..0000000 --- a/model/trainingwheel.py +++ /dev/null @@ -1,44 +0,0 @@ -# -*- encoding: utf-8 -*- - -# Third party module imports -from keras import Input, Model - -class TrainingWheel(Model): - def __init__(self, sem_input_shapes): - self.inputs = self._semantic_input(sem_input_shapes) - - def _semantic_input(self, sem_input_shapes): - """ - Create a keras Model block that takes in the semantic inputs and just - forwards the input. The whole point of it is so that at autoencoder - build, the semantic input can be easily inserted for training and - removed for inference. - - Args: - sem_input_shapes (List[int]): List of integers representing the - shapes of the semantic inputs. - """ - inputs = [Input(shape=(input_shape,)) - for input_shape in sem_input_shapes] - - return Model(inputs, inputs) - - def call(self, inputs): - """ - Calls the model on a batch of inputs. - - Args: - inputs (Tensor): Batch of input data. - - Returns: - (Tensor) Same input data passed through the model. - """ - return self.inputs(inputs) - - def summary(self): - """ - Prints Model summary. - """ - self.inputs.summary() - -# EOF diff --git a/train/decoder_train.py b/train/decoder_train.py deleted file mode 100644 index d10386c..0000000 --- a/train/decoder_train.py +++ /dev/null @@ -1,101 +0,0 @@ -#-*- coding: utf-8 -*- - -import keras -from keras import Model -import numpy as np -import time -import typing -from typing import List - -from model.model import Decoder -from visualize.plot import spectra_plot - -def decoder_workflow(params, train_set, validation_set, test_set, - n_classes, categories, keys, modelpath): - decoder = build_decoder(params, train_set[0].shape[1:], n_classes) - - decoder = train_decoder(decoder, params, train_set, validation_set) - # Test model performance - - metrics = {key: None for key in keys} - metrics, test_predict = test_decoder(decoder, test_set, metrics) - - target = categories["target"][str(np.argmax(test_set[1][1][0]))] - treatment = categories["treatment"][str(np.argmax(test_set[1][0][0]))] - - spectra_plot(test_predict[0], name=f"{target}-{treatment}-predict") - spectra_plot(test_set[0][0], name=f"{target}-{treatment}-true") - -def build_decoder(params, input_shape, semantic_dims): - """ - """ - - params = params["decoder_params"] - decoder = Decoder( - input_shape, - params["head_size"], - params["num_heads"], - params["ff_dim"], - params["num_transformer_blocks"], - params["mlp_units"], - params["dropout"], - params["mlp_dropout"] - ) - - decoder.compile( - optimizer=keras.optimizers.Adam(learning_rate=4e-4), - #loss=params["loss"], - metrics=params["metrics"] - ) - - return decoder - -def train_decoder(decoder, params, train, validation): - """ - """ - - log_level = params["log_level"] - params = params["decoder_params"] - start = time.time() - decoder.fit( - x=train[1], - y=train[0], - validation_data=(validation[1], validation[0]), - batch_size=params["batch_size"], - epochs=params["epochs"], - verbose=log_level - ) - end = time.time() - print("Training time: ", end - start) - - return decoder - -def test_decoder(decoder: Model, test: List, metrics: dict): - """ - """ - - test_eval = decoder.evaluate(test[1], test[0]) - if len(metrics.keys()) == 1: - metrics[metrics.keys()[0]] = test_eval - else: - for i, key in enumerate(metrics.keys()): - metrics[key] = test_eval[i] - - test_predict = decoder.predict(test[1]) - - return metrics, test_predict - -def evaluate_decoder(params, test_predict, test_set, test_loss, - test_accuracy, categories, keys): - params = params["decoder_params"] - - for predict, groundtruth, key in zip(test_predict, test_set[1], keys): - confusion_matrix(predict, groundtruth, categories[key], key) - roc_plot(predict, groundtruth, key) - save_metric(params, test_loss, test_accuracy) - -def save_decoder(decoder: Model): - model.save(path + "decoder.keras") - return - -# EOF diff --git a/train/encoder_train.py b/train/encoder_train.py deleted file mode 100644 index 2013e8d..0000000 --- a/train/encoder_train.py +++ /dev/null @@ -1,125 +0,0 @@ -#-*- coding: utf-8 -*- - -import time - -import os -import keras - -from model.model import CompoundModel -from visualize.visualize import confusion_matrix -from visualize.plot import roc_plot - -def encoder_workflow(params, shape, n_classes, - train_set, validation_set, test_set, - categories, keys, modelpath): - model = build_encoder(params, shape, n_classes) - model = train_encoder(params, model, train_set, validation_set) - test_predict, test_loss, test_accuracy = test_encoder( - params, - model, - test_set, - categories, - keys - ) - - evaluate_encoder( - params, - test_predict, - test_set, - test_loss, - test_accuracy, - categories, - keys - ) - - save_encoder(model, path) - -def build_encoder(params, input_shape, semantic_dims): - log_level = params["log_level"] - params = params["encoder_params"] - model = CompoundModel( - input_shape, - params["head_size"], - params["num_heads"], - params["ff_dim"], - params["num_transformer_blocks"], - params["mlp_units"], - semantic_dims, - params["var_dims"], - dropout=params["dropout"], - mlp_dropout=params["mlp_dropout"] - ) - - model.compile( - optimizer=keras.optimizers.Adam(learning_rate=4e-4), - #loss=params["loss"], - metrics=params["metrics"] - ) - if log_level == 1: - model.summary() - - return model - -def train_encoder(params, model, train_set, validation_set): - log_level = params["log_level"] - params = params["encoder_params"] - start = time.time() - model.fit( - x=train_set[0], y=train_set[1], - validation_data=(validation_set[0], validation_set[1]), - batch_size=params["batch_size"], - epochs=params["epochs"], - verbose=log_level - ) - end = time.time() - print("Training time: ", end - start) - return model - -def test_encoder(params, model, test_set, categories, keys): - params = params["encoder_params"] - # Test model performance - test_loss, test_accuracy, _, _, _, _ = model.evaluate( - test_set[0], - test_set[1] - ) - - test_predict = model.predict(test_set[0]) - print(f"Test loss: {test_loss}, test accuracy: {test_accuracy}") - return test_predict, test_loss, test_accuracy - -def evaluate_encoder(params, test_predict, test_set, test_loss, test_accuracy, - categories, keys): - params = params["encoder_params"] - for predict, groundtruth, key in zip(test_predict, test_set[1], keys): - confusion_matrix(predict, groundtruth, categories[key], key) - roc_plot(predict, groundtruth, key) - save_metric(params, test_loss, test_accuracy) - -def save_metric(params, test_loss, test_accuracy): - """ - Save the hyperparameters and metric to csv - """ - - metric = { - "head_size": params["head_size"], - "num_heads": params["num_heads"], - "ff_dim": params["ff_dim"], - "num_transformer_blocks": params["num_transformer_blocks"], - "mlp_units": params["mlp_units"][0], - "dropout": params["dropout"], - "mlp_dropout": params["mlp_dropout"], - "batch_size": params["batch_size"], - "epochs": params["epochs"], - "test_loss": test_loss, - "test_accuracy": test_accuracy - } - if not os.path.exists("/app/workdir/metrics.csv"): - with open("/app/workdir/metrics.csv", "w") as f: - f.write(",".join(metric.keys()) + "\n") - with open("/app/workdir/metrics.csv", "a") as f: - f.write(",".join([str(value) for value in metric.values()]) + "\n") - -def save_encoder(model, path): - model.save(path + "encoder.keras") - -# EOF From 8fbf9dbc32bbb8cf61249abfcb2219dca639ac62 Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:15:40 -0500 Subject: [PATCH 03/11] Enum docstring --- pipe/enumsets.py | 60 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/pipe/enumsets.py b/pipe/enumsets.py index 136d0c8..3161877 100644 --- a/pipe/enumsets.py +++ b/pipe/enumsets.py @@ -1,23 +1,53 @@ #-*- coding: utf-8 -*- """ -Enumeration of data types called from onekey output files. +Enumeration of data types called from onekey output files to enforce consistent +data handling without worrying about string formats. """ -from enum import Enum, IntEnum +# Built-in module imports +from enum import Enum, IntEnum, auto +from typing import Any, List, Sequence + +class Thresholds(Enum): + NCNT = 2 + BB = 3 + NSD = 0.3 + +class StrEnum(str, Enum): + def __str__(self) -> str: + return self.value class FileType(IntEnum): - HDF5 = 0 - MAT = 1 - SHARD = 2 - -class DataKind(Enum): - BB = {"Full_Name": "Backscatter Brightness"} - FPS = {"Full_Name": "Framerate"} - NCNT = {"Full_Name": "Foreground Pixel Count"} - NSD = {"Full_Name": "Normalized Standard Deviation"} - SGRAM = {"Full_Name": "Spectrogram"} - SPEC = {"Full_Name": "Spectra"} - TREATMENT = {"Full_Name": "Treatment"} - TARGET = {"Full_Name": "Target"} + HDF5 = auto() + MAT = auto() + SHARD = auto() + +class Data(Enum): + _ = "" + +class TimeDependent(Enum): + SPEC = "SP" #{"Full_Name": "Spectra"} + BB = "BB" #{"Full_Name": "Backscatter Brightness"} + NCNT = "NCNT" #{"Full_Name": "Foreground Pixel Count"} + NSD = "NSD" #{"Full_Name": "Normalized Standard Deviation"} + +class TimeIndependent(Enum): + _ = "" + +class Categorical(Enum): + Treatment = "drug" #{"Full_Name": "Treatment"} + Target_Type = "cell" #{"Full_Name": "Target"} + Target_Growth = "grow" #{"Full_Name": "Target"} +class Continuous(Enum): + Low_Freq = "f1" #{"Full_Name": "Low Frequency"} + High_Freq = "f2" #{"Full_Name": "High Frequency"} + Baselines = "t0" + Endpoint = "tf" + DeltaT = "dt" + +TimeIndependent.Categorical = Categorical +TimeIndependent.Continuous = Continuous +Data.TimeDependent = TimeDependent +Data.TimeIndependent = TimeIndependent # EOF From 38de66a94be0b8fbaf708af62cb1b1c5dd64be3a Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:16:02 -0500 Subject: [PATCH 04/11] New latent feature vectors added --- pipe/etl.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pipe/etl.py b/pipe/etl.py index 35085a0..2c0f937 100644 --- a/pipe/etl.py +++ b/pipe/etl.py @@ -6,6 +6,7 @@ the spectrogram data and the labels. """ +# 3rd party module imports import keras import matplotlib.pyplot as plt import numpy as np @@ -14,9 +15,11 @@ from sklearn.metrics import confusion_matrix import tensorflow as tf +# Local module imports +from pipe.enumsets import Data from pipe.extract import extract -from pipe.transform import transform from pipe.load import load +from pipe.transform import transform def etl(spark: SparkSession, split: list=None) -> DataFrame: """ @@ -29,7 +32,7 @@ def etl(spark: SparkSession, split: list=None) -> DataFrame: types.DataFrame: The final processed DataFrame after ETL. """ data = extract(spark) - data = transform(spark, data, keys=["treatment", "target"]) + data = transform(spark, data) load(data) match split: case None: @@ -72,14 +75,18 @@ def split_sets(data: DataFrame, split=[0.99, 0.005, 0.005]) -> tuple: """ category_dict = { - key: build_dict(data, key) for key in ["treatment", "target"] + key.name: build_dict(data, key.name) + for key in Data.TimeIndependent.Categorical } splits = data.randomSplit(split, seed=42) trainx, valx, testx = (trim(dset, "timeseries") for dset in splits) trainy, valy, testy = ( [ - np.array(dset.select("treatment").collect()).squeeze(), - np.array(dset.select("target").collect()).squeeze() + *[ + np.array(dset.select(data.name).collect()).squeeze() + for data in Data.TimeIndependent.Categorical + ], + np.array(dset.select("constants").collect()).squeeze() ] for dset in splits ) From 1d2361361cb0ce60606549f85af186da97d36b2d Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:17:07 -0500 Subject: [PATCH 05/11] Temporary weight adjustment on MAE against continuous variables --- model/latent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model/latent.py b/model/latent.py index a5d0721..3fc1a0a 100644 --- a/model/latent.py +++ b/model/latent.py @@ -81,7 +81,7 @@ def _build(self, mlp_dim, semantic_dims): ] # Compute MAE, with normalization by approximate range of values (~4) - errors.append(mean_absolute_error(targets[-1], reg) / 4.) + errors.append(mean_absolute_error(targets[-1], reg) / 40.) error = ops.sum(ops.stack(errors)) # Combine everything into single dense layer From 87bb2d9a98305459a6dcd9f4ef3dccc6ef754071 Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:17:22 -0500 Subject: [PATCH 06/11] Layer regularization added --- model/encoder.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/model/encoder.py b/model/encoder.py index d803f1d..4c5a510 100644 --- a/model/encoder.py +++ b/model/encoder.py @@ -5,7 +5,8 @@ # Third party module imports from keras import Input, Model -from keras.layers import BatchNormalization, Dense, Dropout, GlobalAveragePooling1D +from keras.layers import BatchNormalization, Dense, Dropout, GlobalAveragePooling1D, \ + LayerNormalization # Local module imports from model.transformer import TimeseriesTransformerBuilder as TSTFBuilder @@ -87,6 +88,7 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim, for dim in mlp_units: x = Dense(dim, activation="relu")(x) x = Dropout(mlp_dropout)(x) + x = LayerNormalization()(x) # Two separate latent spaces supported From 5fda371c95d6e9f64f00e03d2f3e9919aa7d7870 Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:17:49 -0500 Subject: [PATCH 07/11] Adjustmenst --- pipe/extract.py | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/pipe/extract.py b/pipe/extract.py index 4631105..660d5f2 100644 --- a/pipe/extract.py +++ b/pipe/extract.py @@ -3,6 +3,7 @@ extract.py """ +# Built-in module imports import json import glob import os @@ -10,6 +11,7 @@ import typing from typing import List +# 3rd party module imports import cv2 as cv import h5py import numpy as np @@ -19,6 +21,7 @@ from pyspark.sql.types import ArrayType, FloatType import scipy as sp +# Local module imports from pipe.enumsets import Data, FileType, Thresholds from pipe.parse_utils import get_field, strip_array @@ -41,26 +44,6 @@ def extract(spark: SparkSession) -> DataFrame: #return reader.read_file(path, labels) return spark.createDataFrame(rdd) -def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str, - stacksize: int) -> np.ndarray: - """ - Loads a stack of images from a path based on the given name pattern. - - Args: - imagepath (Path): Path to the image files. - namepattern (str): Name pattern for the image files. - stacksize (int): Number of images in the stack. - - Returns: - images: 3D numpy array of stacked images. - """ - - images = np.zeros((stacksize, 800, 800)) - for i in range(stacksize): - images[i,:,:] = cv.imread(imagepath/namepattern.format(i), -1) - - return images - class FileReader: """ Class to read spectrograms and metadata from different file formats based @@ -224,8 +207,8 @@ def read_matfiles(self, specpath: Path, axis=0, kind="linear" ) - timeseries_array[:nloops-endpoint,:] = baseline_stretched( - np.arange(0,baseline,len(timeseries_array)-endpoint) + timeseries_array[:nloops-diff,:] = baseline_stretched( + np.linspace(0,baseline-1,(nloops-diff)) ) row["timeseries"] = timeseries_array del timeseries_array From d64197da3682fde1d0631e2ec2ff02e676c7a54c Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:18:18 -0500 Subject: [PATCH 08/11] Namespace changes to match new enumsets --- pipe/transform.py | 52 ++++++++--------------------------------------- 1 file changed, 9 insertions(+), 43 deletions(-) diff --git a/pipe/transform.py b/pipe/transform.py index 80abcd9..047552b 100644 --- a/pipe/transform.py +++ b/pipe/transform.py @@ -3,12 +3,17 @@ transform.py """ +# Built-in module imports import typing +# 3rd party module imports from pyspark.sql import DataFrame, functions, SparkSession, types from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler +# Local module imports +from pipe.enumsets import Data + def merge_redundant_treatment_labels(dataframe: DataFrame) -> DataFrame: """ Merge redundant labels in the 'treatment' column of the dataframe. This @@ -29,7 +34,7 @@ def merge_redundant_treatment_labels(dataframe: DataFrame) -> DataFrame: .replace("dld", "pbs").distinct() return dataframe -def onehot(dataframe: DataFrame, keys: list) -> DataFrame: +def onehot(dataframe: DataFrame) -> DataFrame: """ One-hot encode the specified categorical columns in the dataframe. The column names to be encoded this way are provided in the 'keys' list. @@ -43,45 +48,7 @@ def onehot(dataframe: DataFrame, keys: list) -> DataFrame: pyspark.sql.DataFrame: New DataFrame with one-hot encoded column(s). """ - """ OLD BLOCK - indexers = [] - encoders = [] - indexed_cols = [] - - for key in keys: - if key not in dataframe.columns: - raise ValueError(f"Column \"{key}\" cannot be found in DataFrame.") - - indexed_column = f"{key}_indexed" - encoded_column = f"{key}_encoded" - indexed_cols.append(indexed_column) - - indexers.append( - StringIndexer( - inputCol=key, - outputCol=indexed_column, - handleInvalid="keep" - ) - ) - - encoders.append( - OneHotEncoder( - inputCol=indexed_column, - outputCol=encoded_column, - dropLast=False - ) - ) - - pipeline = Pipeline(stages=indexers + encoders) - model = pipeline.fit(dataframe) - result = model.transform(dataframe) - - result = result.drop(*indexed_cols[:]) - for column_name in keys: - result = result.withColumnRenamed(column_name, f"{column_name}_str") - result = result.withColumnRenamed(f"{column_name}_encoded", column_name) - """ - + keys = [data.name for data in Data.TimeIndependent.Categorical] indexer = StringIndexer( inputCols=keys, outputCols=[f"{c}_idx" for c in keys], @@ -110,8 +77,7 @@ def onehot(dataframe: DataFrame, keys: list) -> DataFrame: return result -def transform(spark: SparkSession, dataframe: DataFrame, keys: list) \ - -> DataFrame: +def transform(spark: SparkSession, dataframe: DataFrame) -> DataFrame: """ """ dataframe = merge_redundant_treatment_labels(dataframe) @@ -119,6 +85,6 @@ def transform(spark: SparkSession, dataframe: DataFrame, keys: list) \ "index", functions.monotonically_increasing_id() ) - dataframe = onehot(dataframe, keys) + dataframe = onehot(dataframe) return dataframe From 684cb8a6dd4cfe74a764b31fddcc055fdd6be429 Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:18:35 -0500 Subject: [PATCH 09/11] Revised workflow for autoencoder training --- train/autoencoder_train.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/train/autoencoder_train.py b/train/autoencoder_train.py index e00a6a8..fb64c2f 100644 --- a/train/autoencoder_train.py +++ b/train/autoencoder_train.py @@ -1,10 +1,12 @@ #-*- coding: utf-8 -*- +# Built-in module imports from datetime import datetime import time import typing from typing import List +# 3rd party module imports import numpy as np import os import keras @@ -13,6 +15,8 @@ from keras.callbacks import ModelCheckpoint, CSVLogger import matplotlib.pyplot as plt +# Local module imports +from model.callbacks import EpochTracker from model.model import CompoundModel from model.metrics import MutualInformation, mutual_information from visualize.visualize import confusion_matrix @@ -75,6 +79,7 @@ def train_autoencoder(params, model, train_set, validation_set, path): timestamp = params["timestamp"] params = params["autoencoder_params"] callbacks = [ + EpochTracker(), ModelCheckpoint( filepath=path / timestamp / f"{timestamp}_checkpoint.keras", monitor = "val_loss", @@ -87,9 +92,9 @@ def train_autoencoder(params, model, train_set, validation_set, path): start = time.time() model.fit( - x=[train_set[0], train_set[1][0], train_set[1][1]], y=train_set[0], + x=[train_set[0], *train_set[1]], y=train_set[0], validation_data=( - [validation_set[0], validation_set[1][0], validation_set[1][1]], + [validation_set[0], *validation_set[1]], validation_set[0]), batch_size=params["batch_size"], epochs=params["epochs"], @@ -104,7 +109,7 @@ def test_autoencoder(model: Model, test: List, metrics: dict): """ """ - test = [test[0], test[1][0], test[1][1]] + test = [test[0], *test[1]] test_eval = model.evaluate(test, test) if len(metrics.keys()) == 1: metrics[metrics.keys()[0]] = test_eval From d370ba2099b1abbf52050f8df156f6bd882dd4b4 Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:18:49 -0500 Subject: [PATCH 10/11] Removed obsolete training functions --- train_model.py | 34 ++++++---------------------------- 1 file changed, 6 insertions(+), 28 deletions(-) diff --git a/train_model.py b/train_model.py index 549f492..8f812fc 100644 --- a/train_model.py +++ b/train_model.py @@ -24,8 +24,6 @@ # Local module imports from pipe.etl import etl, read -from train.encoder_train import encoder_workflow -from train.decoder_train import decoder_workflow from train.autoencoder_train import autoencoder_workflow def parameters(): @@ -68,7 +66,12 @@ def data_parallel(): def main(): # jax mesh setup params = parameters() - spark = SparkSession.builder.appName("train").getOrCreate() + spark = SparkSession.builder \ + .appName("train") \ + .config("spark.executor.memory", "16g") \ + .config("spark.driver.memory", "8g") \ + .config("spark.executor.memoryOverhead", "4g") \ + .getOrCreate() keys = ["treatment", "target"] #parallel = data_parallel() #keras.distribution.set_distribution(parallel) @@ -91,31 +94,6 @@ def main(): os.mkdir(path/timestamp) sh.copy("parameters.json", path/timestamp/"parameters.json") - if params["train_encoder"]: - encoder_workflow( - params, - shape, - n_classes, - train_set, - validation_set, - test_set, - categories, - keys, - path - ) - - if params["train_decoder"]: - decoder_workflow( - params, - train_set, - validation_set, - test_set, - n_classes, - categories, - keys, - path - ) - if params["train_autoencoder"]: autoencoder_workflow( params, From 8da0e14364bd035b89e695a241ddb09574496bde Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 8 Mar 2026 10:20:51 -0500 Subject: [PATCH 11/11] Removed workflows since we don't have a runner --- .github/workflows/docker-image.yml | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 .github/workflows/docker-image.yml diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml deleted file mode 100644 index ef38e0f..0000000 --- a/.github/workflows/docker-image.yml +++ /dev/null @@ -1,18 +0,0 @@ -name: Docker Image CI - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - -jobs: - - build: - - runs-on: [ ubuntu-latest ] - - steps: - - uses: actions/checkout@v4 - - name: Build the Docker image - run: docker build setup/. --file Dockerfile --tag geonosis:$(date +%s)