From 2f9298ce2a01d6ea463e47248783c71a101e0fcb Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 1 Mar 2026 20:51:06 -0500 Subject: [PATCH 1/6] Model structure slightly modified --- model/model.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/model/model.py b/model/model.py index d2b5251..540479c 100644 --- a/model/model.py +++ b/model/model.py @@ -11,7 +11,7 @@ GlobalAveragePooling1D, LayerNormalization, Masking, Conv2D, \ MultiHeadAttention, concatenate -from model.activation import sublinear +from model.activation import sublinear, linear from model.transformer import TimeseriesTransformerBuilder as TSTFBuilder class CompoundModel(Model): @@ -77,6 +77,8 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim, #x = inputs #inputs = Masking(mask_value=pad_value)(inputs) x = BatchNormalization()(inputs) + + # Transformer blocks for _ in range(num_Transformer_blocks): x = self.tstfbuilder.build_transformerblock( x, @@ -85,12 +87,18 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim, ff_dim, dropout ) + + # Pooling and simple DNN block x = GlobalAveragePooling1D(data_format="channels_first")(x) for dim in mlp_units: x = Dense(dim, activation="relu")(x) x = Dropout(mlp_dropout)(x) - y = Dense(n_classes[0], activation="softmax")(x) - z = Dense(n_classes[1], activation="softmax")(x) + + # Two separate latent spaces supported + #y = Dense(n_classes[0], activation="softmax")(x) + #z = Dense(n_classes[1], activation="softmax")(x) + y = Dense(n_classes[0], activation="relu")(x) + z = Dense(n_classes[1], activation="relu")(x) return Model(inputs, [y, z]) @@ -189,10 +197,11 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim, x = Dense(full_dimension, activation="relu")(x) x = Reshape((input_shape[0], input_shape[1]))(x) + """ for dim in mlp_units: x = Dense(dim, activation="relu")(x) x = Dropout(mlp_dropout)(x) - + """ for _ in range(num_Transformer_blocks): x = self.tstfbuilder.build_transformerblock( x, @@ -206,7 +215,7 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim, x = Conv1D(filters=input_shape[1], kernel_size=1, padding="valid", - activation=sublinear)(x) + activation=linear)(x) return Model(inputs, x) From 520fbe31e8126d58cb781495d56304228ba30b8b Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 1 Mar 2026 20:51:45 -0500 Subject: [PATCH 2/6] Revised data pipeline to handle large throughput --- pipe/etl.py | 2 +- pipe/extract.py | 42 +++++++++++++++++++++++++++++------------- pipe/transform.py | 29 ++++++++++++++++++++++++++++- 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/pipe/etl.py b/pipe/etl.py index 069a020..a6499be 100644 --- a/pipe/etl.py +++ b/pipe/etl.py @@ -85,7 +85,7 @@ def split_sets(data: DataFrame, split=[0.99, 0.005, 0.005]) -> tuple: def trim(dataframe, column): ndarray = np.array(dataframe.select(column).collect()) \ - .reshape(-1, 34, 133) + .reshape(-1, 36, 133) return ndarray diff --git a/pipe/extract.py b/pipe/extract.py index d905972..7f0c4d3 100644 --- a/pipe/extract.py +++ b/pipe/extract.py @@ -13,6 +13,8 @@ import cv2 as cv import h5py import numpy as np +from numpy import ndarray +from pyspark.ml.linalg import Vectors from pyspark.sql import SparkSession, Row, DataFrame import scipy as sp @@ -35,8 +37,10 @@ def extract(spark: SparkSession) -> DataFrame: labels.append(line.strip().split(",")[0]) reader = FileReader(spark, filetype=FileType.MAT) - - return reader.read_file(path, labels) + rdd = spark.sparkContext.parallelize(reader.read_file(path, labels), + numSlices=200) + #return reader.read_file(path, labels) + return spark.createDataFrame(rdd) def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str, stacksize: int) -> np.ndarray: @@ -58,6 +62,15 @@ def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str, return images +def strip_array(arr: ndarray) -> ndarray: + dtype = arr.dtype + while dtype == "object": + arr = arr[0] + dtype = arr.dtype + if len(arr) == 0: + return ["unknown"] + return arr + class FileReader: """ Class to read spectrograms and metadata from different file formats based @@ -105,7 +118,7 @@ def metadata_read(self, metapath: Path, labels:list, def read_matfiles(self, specpath: Path, datakinds: List[DataKind], - default_size: tuple = (34, 130), + default_size: tuple = (36, 130), pad_value: float = 0.) -> DataFrame: """ Loads data for each stack iteration from a set of mat files, @@ -121,8 +134,7 @@ def read_matfiles(self, specpath: Path, Returns: DataFrame: Spark DataFrame containing the requested data. """ - data = [] - row = {} + #data = [] labels = glob.glob(str(specpath/"matfiles"/"*.mat")) nloops = default_size[0] nfreq = default_size[1] @@ -137,6 +149,8 @@ def read_matfiles(self, specpath: Path, ncnt_scale = 5. for label in labels: + row = {} + print(label) matdata = sp.io.loadmat(specpath/"matfiles"/label) ncnt = np.log10(matdata["NCNT"][0]) if np.min(ncnt) < 2: @@ -174,18 +188,20 @@ def read_matfiles(self, specpath: Path, (matdata["NSD"][0] - nsd_meanshift) timeseries_array[time_offset:, 132] = \ np.log10(matdata["NCNT"][0]) / ncnt_scale - row["timeseries"] = timeseries_array.tolist() + timeseries_array = Vectors.dense(timeseries_array.flatten()) + row["timeseries"] = timeseries_array if DataKind.TREATMENT in datakinds: - row["treatment"] = matdata["header"]["drug"][0][0][0].lower() + row["treatment"] = strip_array( + matdata["header"]["drug"])[0].lower() if DataKind.TARGET in datakinds: - try: - row["target"] = matdata["header"]["cell"][0][0][0].lower() - except: - row["target"] = "unknown" - data.append(Row(**row)) + row["target"] = strip_array( + matdata["header"]["cell"])[0].lower() + row["target"] = "unknown" + #data.append(Row(**row)) + yield row - return self.spark.createDataFrame(data) + #return self.spark.createDataFrame(data) def read_hdf5(self, specpath: Path, labels: list, namepattern:str="averaged_spectrogram{}.hdf5") \ diff --git a/pipe/transform.py b/pipe/transform.py index 0c4637e..80abcd9 100644 --- a/pipe/transform.py +++ b/pipe/transform.py @@ -7,7 +7,7 @@ from pyspark.sql import DataFrame, functions, SparkSession, types from pyspark.ml import Pipeline -from pyspark.ml.feature import StringIndexer, OneHotEncoder +from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler def merge_redundant_treatment_labels(dataframe: DataFrame) -> DataFrame: """ @@ -43,6 +43,7 @@ def onehot(dataframe: DataFrame, keys: list) -> DataFrame: pyspark.sql.DataFrame: New DataFrame with one-hot encoded column(s). """ + """ OLD BLOCK indexers = [] encoders = [] indexed_cols = [] @@ -79,6 +80,32 @@ def onehot(dataframe: DataFrame, keys: list) -> DataFrame: for column_name in keys: result = result.withColumnRenamed(column_name, f"{column_name}_str") result = result.withColumnRenamed(f"{column_name}_encoded", column_name) + """ + + indexer = StringIndexer( + inputCols=keys, + outputCols=[f"{c}_idx" for c in keys], + handleInvalid="keep" + ) + + encoder = OneHotEncoder( + inputCols=[f"{c}_idx" for c in keys], + outputCols=[f"{c}_vec" for c in keys], + dropLast=False + ) + + assembler = VectorAssembler( + inputCols=[f"{c}_vec" for c in keys], + outputCol="features" + ) + pipeline = Pipeline(stages=[indexer, encoder, assembler]) + model = pipeline.fit(dataframe) + result = model.transform(dataframe) + + for c in keys: + result = result.withColumnRenamed(c, f"{c}_str") \ + .withColumnRenamed(f"{c}_vec", c) + return result From e891126ed886be0a35e5d96f566473db25e9c3b5 Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 1 Mar 2026 20:52:24 -0500 Subject: [PATCH 3/6] Improved workflow for model training, including proper monitoring for autoencoder training --- train/autoencoder_train.py | 120 ++++++++++++++++++++++++++++++++++++- train/decoder_train.py | 17 ++++-- train_model.py | 49 +++++++++++++-- 3 files changed, 177 insertions(+), 9 deletions(-) diff --git a/train/autoencoder_train.py b/train/autoencoder_train.py index 2703748..e9e7402 100644 --- a/train/autoencoder_train.py +++ b/train/autoencoder_train.py @@ -1,3 +1,121 @@ -# -*- coding: utf-8 -*- +#-*- coding: utf-8 -*- + +from datetime import datetime +import time +import typing +from typing import List + +import numpy as np +import os +import keras +from keras.metrics import MeanSquaredError +from keras import Model +from keras.callbacks import ModelCheckpoint, CSVLogger +import matplotlib.pyplot as plt + +from model.model import CompoundModel +from model.metrics import MutualInformation, mutual_information +from visualize.visualize import confusion_matrix +from visualize.plot import roc_plot +from train.encoder_train import build_encoder +from train.decoder_train import build_decoder + +def autoencoder_workflow(params, shape, n_classes, + train_set, validation_set, test_set, + categories, keys, path): + + model = build_autoencoder(params, shape, n_classes) + model = train_autoencoder(params, model, train_set, validation_set, path) + + m = {key: None for key in keys} + m, test_predict = test_autoencoder( + model, + test_set, + m + ) + model_metrics = {metric: value for metric, value in m.items()} + + evaluate_autoencoder( + params, + test_predict, + test_set[0], + categories, + + keys, + path + ) + + save_autoencoder(params, model, path) + +def build_autoencoder(params, shape, n_classes): + autoencoder_params = params["autoencoder_params"] + #mi = MutualInformation() + mse = MeanSquaredError() + + encoder_model = build_encoder(params, shape, n_classes) + decoder_model = build_decoder(params, shape, n_classes) + model = keras.Sequential([encoder_model, decoder_model]) + model.compile( + optimizer=keras.optimizers.Adam(learning_rate=4e-4), + loss=autoencoder_params["loss"], + metrics=[mse]#, mutual_information] + ) + + return model + +def train_autoencoder(params, model, train_set, validation_set, path): + log_level = params["log_level"] + timestamp = params["timestamp"] + params = params["autoencoder_params"] + callbacks = [ + ModelCheckpoint( + filepath=path / timestamp / f"{timestamp}_checkpoint.keras", + monitor = "val_loss", + save_best_only=True, + save_weights_only=False, + verbose=1 + ), + CSVLogger(path / timestamp / f"{timestamp}_log.csv") + ] + + start = time.time() + model.fit( + x=train_set, y=train_set, + validation_data=(validation_set, validation_set), + batch_size=params["batch_size"], + epochs=params["epochs"], + verbose=log_level, + callbacks=callbacks + ) + end = time.time() + print("Training time: ", end - start) + return model + +def test_autoencoder(model: Model, test: List, metrics: dict): + """ + """ + + test_eval = model.evaluate(test, test) + if len(metrics.keys()) == 1: + metrics[metrics.keys()[0]] = test_eval + else: + for i, key in enumerate(metrics.keys()): + metrics[key] = np.mean(test_eval[i]) + + test_predict = model.predict(test)[0] + + return metrics, test_predict + +def evaluate_autoencoder(params, test_predict, test_set, categories, keys, path): + plt.pcolor(test_set) + plt.savefig(path / params["timestamp"] / "original.png") + plt.close() + plt.pcolor(test_predict) + plt.savefig(path / params["timestamp"] / "reproduction.png") + plt.close() + return + +def save_autoencoder(params, model, path): + model.save(path / params["timestamp"] / f"{params['timestamp']}_autoencoder.keras") # EOF diff --git a/train/decoder_train.py b/train/decoder_train.py index 56bd9fa..2e4c6ff 100644 --- a/train/decoder_train.py +++ b/train/decoder_train.py @@ -11,8 +11,8 @@ from visualize.plot import spectra_plot def decoder_workflow(params, train_set, validation_set, test_set, - n_classes, categories, keys): - decoder = load_decoder(params, train_set[0].shape[1:], n_classes) + n_classes, categories, keys, modelpath): + decoder = build_decoder(params, train_set[0].shape[1:], n_classes) decoder = train_decoder(decoder, params, train_set, validation_set) # Test model performance @@ -26,7 +26,7 @@ def decoder_workflow(params, train_set, validation_set, test_set, spectra_plot(test_predict[0], name=f"{target}-{treatment}-predict") spectra_plot(test_set[0][0], name=f"{target}-{treatment}-true") -def load_decoder(params, input_shape, n_classes): +def build_decoder(params, input_shape, n_classes): """ """ @@ -86,8 +86,17 @@ def test_decoder(decoder: Model, test: List, metrics: dict): return metrics, test_predict -def save_decoder(decoder: Model): +def evaluate_decoder(params, test_predict, test_set, test_loss, + test_accuracy, categories, keys): + params = params["decoder_params"] + for predict, groundtruth, key in zip(test_predict, test_set[1], keys): + confusion_matrix(predict, groundtruth, categories[key], key) + roc_plot(predict, groundtruth, key) + save_metric(params, test_loss, test_accuracy) + +def save_decoder(decoder: Model): + model.save(path + "decoder.keras") return # EOF diff --git a/train_model.py b/train_model.py index 979b14f..d7b79e2 100644 --- a/train_model.py +++ b/train_model.py @@ -5,27 +5,35 @@ Launches the training process for the model. """ +# Built-in module imports +from datetime import datetime import os +from pathlib import Path +import shutil as sh +# Environment variables os.environ["KERAS_BACKEND"] = "jax" os.environ["XLA_PYTHON_CLIENT_PREALLOCATE"] = "false" +# 3rd party module imports import jax import json from pyspark.sql import SparkSession import keras import matplotlib.pyplot as plt +# Local module imports from pipe.etl import etl, read from train.encoder_train import encoder_workflow from train.decoder_train import decoder_workflow +from train.autoencoder_train import autoencoder_workflow def parameters(): with open("parameters.json", "r") as file: params = json.load(file) return params -def multi_gpu(): +def model_parallel(): devices = jax.devices("gpu") mesh = keras.distribution.DeviceMesh( shape=(len(devices),), axis_names=["model"], devices=devices @@ -45,14 +53,25 @@ def multi_gpu(): model_parallel = keras.distribution.ModelParallel( layout_map=layout_map ) - keras.distribution.set_distribution(model_parallel) + return model_parallel +def data_parallel(): + devices = jax.devices("gpu") + data_parallel = keras.distribution.DataParallel(devices=devices) + mesh = keras.distribution.DeviceMesh( + shape=(len(devices),), axis_names=["data"], devices=devices + ) + data_parallel = keras.distribution.DataParallel(mesh) + + return data_parallel def main(): # jax mesh setup params = parameters() spark = SparkSession.builder.appName("train").getOrCreate() keys = ["treatment", "target"] + #parallel = data_parallel() + #keras.distribution.set_distribution(parallel) if params["load_from_scratch"]: data = etl(spark, split=params["encoder_params"]["split"]) @@ -63,7 +82,14 @@ def main(): n_classes = [dset.shape[1] for dset in train_set[1]] shape = train_set[0].shape[1:] + path = Path("/app/workdir/model") + strfmt = "%Y%m%d_%H%M%S" + timestamp = datetime.now().strftime(strfmt) + params["timestamp"] = timestamp + os.mkdir(path/timestamp) + sh.copy("parameters.json", path/timestamp/"parameters.json") + if params["train_encoder"]: encoder_workflow( params, @@ -73,7 +99,8 @@ def main(): validation_set, test_set, categories, - keys + keys, + path ) if params["train_decoder"]: @@ -84,7 +111,21 @@ def main(): test_set, n_classes, categories, - keys + keys, + path + ) + + if params["train_autoencoder"]: + autoencoder_workflow( + params, + shape, + n_classes, + train_set[0], + validation_set[0], + test_set[0], + categories, + keys, + path ) return From 9156af505771c0bd1540866239efc73ba002dfa9 Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 1 Mar 2026 20:52:45 -0500 Subject: [PATCH 4/6] Linear works better than the custom sub-linear fnuction --- model/activation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/model/activation.py b/model/activation.py index b710494..c52c233 100644 --- a/model/activation.py +++ b/model/activation.py @@ -5,4 +5,8 @@ def sublinear(x): return x / (1 + K.sqrt(K.sqrt(K.abs(x)))) +def linear(x): + return x + #return x / (1 + K.sqrt(K.sqrt(K.abs(x)))) + # EOF From e4092b27049b4b9b2bcd2d576c9179a220f31026 Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 1 Mar 2026 20:53:01 -0500 Subject: [PATCH 5/6] Fixed encoder training --- train/encoder_train.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/train/encoder_train.py b/train/encoder_train.py index fae134b..ceab077 100644 --- a/train/encoder_train.py +++ b/train/encoder_train.py @@ -11,7 +11,7 @@ def encoder_workflow(params, shape, n_classes, train_set, validation_set, test_set, - categories, keys): + categories, keys, modelpath): model = build_encoder(params, shape, n_classes) model = train_encoder(params, model, train_set, validation_set) test_predict, test_loss, test_accuracy = test_encoder( @@ -31,7 +31,8 @@ def encoder_workflow(params, shape, n_classes, categories, keys ) - + + save_encoder(model, path) def build_encoder(params, input_shape, n_classes): log_level = params["log_level"] @@ -85,7 +86,8 @@ def test_encoder(params, model, test_set, categories, keys): print(f"Test loss: {test_loss}, test accuracy: {test_accuracy}") return test_predict, test_loss, test_accuracy -def evaluate_encoder(params, test_predict, test_set, test_loss, test_accuracy, categories, keys): +def evaluate_encoder(params, test_predict, test_set, test_loss, test_accuracy, + categories, keys): params = params["encoder_params"] for predict, groundtruth, key in zip(test_predict, test_set[1], keys): confusion_matrix(predict, groundtruth, categories[key], key) @@ -116,4 +118,7 @@ def save_metric(params, test_loss, test_accuracy): with open("/app/workdir/metrics.csv", "a") as f: f.write(",".join([str(value) for value in metric.values()]) + "\n") +def save_encoder(model, path): + model.save(path + "encoder.keras") + # EOF From 9a5041970d2f288b529e2892f15b1839543f3d5d Mon Sep 17 00:00:00 2001 From: Dawith Date: Sun, 1 Mar 2026 20:53:22 -0500 Subject: [PATCH 6/6] Launch script made compatible with ps v7+ --- winlaunch.ps1 | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/winlaunch.ps1 b/winlaunch.ps1 index 97f8d93..c9d7e0d 100644 --- a/winlaunch.ps1 +++ b/winlaunch.ps1 @@ -2,18 +2,18 @@ # container with the appropriate volume bind mount. # Load environment variables from .env file -Get-Content .env | foreach { - $name, $value = $_.split('=') - if ([string]::IsNullOrWhiteSpace($name) -or $name.Contains('#')) { - return - } - Set-Content env:\$name $value + +Get-Content .env | ForEach-Object { + if ($_ -match '^\s*$' -or $_ -match '^\s*#') { return } + + $name, $value = $_ -split '=', 2 + Set-Variable -Name $name -Value $value } # Docker environment launch docker run --rm -it ` -v ${PWD}:/app/code ` - -v ${env:DATAPATH}:/app/workdir ` + -v "${DATAPATH}:/app/workdir" ` --gpus all ` --shm-size=4g ` --name geonosis `