From 17dcf1ba61893cc37d70f03628c49a1632bdc21e Mon Sep 17 00:00:00 2001 From: maelstrom Date: Sun, 29 Dec 2024 20:29:25 -0500 Subject: [PATCH] Working version of transformer-based sequence classifier implemented for CPU --- model/autoencoder_smol.py | 69 +++++++++++++++++ model/model.py | 157 ++++++++++++++++++++++++++++++++++++++ pipe/pipe.py | 46 +++++++++-- train_cpu.py | 113 +++++++++++++++++++++++++++ 4 files changed, 380 insertions(+), 5 deletions(-) create mode 100644 model/autoencoder_smol.py create mode 100644 train_cpu.py diff --git a/model/autoencoder_smol.py b/model/autoencoder_smol.py new file mode 100644 index 0000000..be24604 --- /dev/null +++ b/model/autoencoder_smol.py @@ -0,0 +1,69 @@ +import keras + +class Encoder(keras.Model): + def __init__(self, input_size=(26, 130, 1), latent_dim=128): + super(Encoder, self).__init__() + self.encoder = keras.Sequential([ + keras.layers.InputLayer(shape=input_size), + keras.layers.Conv2D(8, (3, 3), activation="relu"), + keras.layers.MaxPooling2D((2, 2)), + keras.layers.Conv2D(16, (3, 3), activation="relu"), + #keras.layers.MaxPooling2D((2, 2)), + keras.layers.Conv2D(32, (3, 3), activation="relu"), + keras.layers.Flatten(), + keras.layers.Dense(latent_dim*64, activation="relu"), + keras.layers.Dense(latent_dim*32, activation="relu"), + keras.layers.Dense(latent_dim*16, activation="relu"), + keras.layers.Dense(latent_dim*8, activation="relu"), + keras.layers.Dense(latent_dim*4, activation="relu"), + keras.layers.Dense(latent_dim*2, activation="relu"), + keras.layers.Dense(latent_dim, activation="relu") + ]) + + def call(self, x): + return self.encoder(x) + + def summary(self): + self.encoder.summary() + +class Decoder(keras.Model): + def __init__(self, latent_dim=128): + super(Decoder, self).__init__() + self.decoder = keras.Sequential([ + keras.layers.InputLayer(shape=(latent_dim,)), + keras.layers.Dense(latent_dim*2, activation="relu"), + keras.layers.Dense(latent_dim*4, activation="relu"), + keras.layers.Dense(latent_dim*8, activation="relu"), + keras.layers.Dense(latent_dim*16, activation="relu"), + keras.layers.Dense(latent_dim*32, activation="relu"), + keras.layers.Dense(latent_dim*64, activation="relu"), + keras.layers.Dense(15360, activation="relu"), + keras.layers.Reshape((8, 60, 32)), + keras.layers.Conv2DTranspose(32, (3, 3), activation="relu"), + #keras.layers.UpSampling2D((2, 2)), + keras.layers.Conv2DTranspose(16, (3, 3), activation="relu"), + keras.layers.UpSampling2D((2, 2)), + keras.layers.Conv2DTranspose(1, (3, 3), activation="sigmoid") + ]) + + def call(self, x): + return self.decoder(x) + + def summary(self): + self.decoder.summary() + +class Autoencoder(keras.Model): + def __init__(self, input_size=(26, 130, 1), latent_dim=128, **kwargs): + super(Autoencoder, self).__init__() + self.encoder = Encoder(input_size=input_size, latent_dim=latent_dim) + self.decoder = Decoder(latent_dim=latent_dim) + + def call(self, x): + encoded = self.encoder(x) + decoded = self.decoder(encoded) + return decoded + + def summary(self): + super().summary() + self.encoder.summary() + self.decoder.summary() diff --git a/model/model.py b/model/model.py index 24b595b..477e2cd 100644 --- a/model/model.py +++ b/model/model.py @@ -1,4 +1,161 @@ """ +model.py + """ +from keras import Input, Model +from keras.layers import Conv1D, Dense, Dropout, GlobalAveragePooling1D, \ + LayerNormalization, Masking, MultiHeadAttention + +class TimeSeriesTransformer(Model): + + def __init__(self, input_shape, head_size, num_heads, ff_dim, + num_Transformer_blocks, mlp_units, n_classes, + dropout=0, mlp_dropout=0): + """ + Initializes the TimeSeriesTransformer class. This class is a + wrapper around a Keras model that consists of a series of + Transformer blocks followed by an MLP. + + Args: + input_shape: tuple, shape of the input tensor. + head_size: int, the number of features in each attention head. + num_heads: int, the number of attention heads. + ff_dim: int, the number of neurons in the feedforward neural + network. + num_Transformer_blocks: int, the number of Transformer blocks. + mlp_units: list of ints, the number of neurons in each layer of + the MLP. + n_classes: int, the number of output classes. + dropout: float, dropout rate. + mlp_dropout: float, dropout rate in the MLP. + + Attributes: + timeseriestransformer: Keras model, the TimeSeriesTransformer + model. + """ + super(TimeSeriesTransformer, self).__init__() + self.timeseriestransformer = self._modelstack( + input_shape, head_size, num_heads, ff_dim, + num_Transformer_blocks, mlp_units, n_classes, + dropout, mlp_dropout) + + def _modelstack(self, input_shape, head_size, num_heads, ff_dim, + num_Transformer_blocks, mlp_units, n_classes, + dropout, mlp_dropout): + """ + Creates a Timeseries Transformer model. This consists of a series of + Transformer blocks followed by an MLP. + + Args: + input_shape: tuple, shape of the input tensor. + head_size: int, the number of features in each attention head. + num_heads: int, the number of attention heads. + ff_dim: int, the number of neurons in the feedforward neural + network. + num_Transformer_blocks: int, the number of Transformer blocks. + mlp_units: list of ints, the number of neurons in each layer of + the MLP. + n_classes: int, the number of output classes. + dropout: float, dropout rate. + mlp_dropout: float, dropout rate in the MLP. + + Returns: + A Keras model. + """ + + inputs = Input(shape=input_shape) + x = Masking(mask_value=32.)(inputs) + for _ in range(num_Transformer_blocks): + x = self._transformerblocks(x, head_size, num_heads, ff_dim, + dropout) + x = GlobalAveragePooling1D(data_format="channels_first")(x) + for dim in mlp_units: + x = Dense(dim, activation="relu")(x) + x = Dropout(mlp_dropout)(x) + outputs = Dense(n_classes, activation="softmax")(x) + + return Model(inputs, outputs) + + def _transformerblocks(self, inputs, head_size, num_heads, + ff_dim, dropout): + """ + Constructs the transformer block. This consists of multi-head + attention, dropout, layer normalization, a residual connection, + a feedforward neural network, and another residual connection. + + Args: + inputs: Tensor, batch of input data. + head_size: int, the number of features in each attention head. + num_heads: int, the number of attention heads. + ff_dim: int, the number of neurons in the feedforward neural + network. + dropout: float, dropout rate. + + Returns: + A model layer. + """ + x = MultiHeadAttention( + key_dim=head_size, num_heads=num_heads, + dropout=dropout)(inputs, inputs) + x = Dropout(dropout)(x) + x = LayerNormalization(epsilon=1e-6)(x) + res = x + inputs + + x = Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res) + x = Dropout(dropout)(x) + x = Conv1D(filters=inputs.shape[-1], kernel_size=1)(x) + outputs = Dropout(dropout)(x) + res + + return outputs + + def call(self, inputs): + """ + Calls the TimeSeriesTransformer model on a batch of inputs. + + Args: + inputs: Tensor, batch of input data. + + Returns: + Tensor, resulting output of the TimeSeriesTransformer model. + """ + return self.timeseriestransformer(inputs) + + def summary(self): + """ + Prints a summary of the TimeSeriesTransformer model. + + Args: + None. + + Returns: + None. + """ + self.timeseriestransformer.summary() + + ''' + def compile(self, loss="sparse_categorical_crossentropy", + optimizer="adam", + metrics=["sparse_categorical_accuracy"]): + """ + Compiles the TimeSeriesTransformer model. + + Args: + loss: str, loss function. + optimizer: str, optimizer. + metrics: list of str, evaluation metrics. + + Returns: + None. + """ + + super() + self.timeseriestransformer.compile( + loss="sparse_categorical_crossentropy", + optimizer="adam", + metrics=["sparse_categorical_accuracy"]) + + return + ''' + # EOF diff --git a/pipe/pipe.py b/pipe/pipe.py index fe7148c..4ac65ad 100644 --- a/pipe/pipe.py +++ b/pipe/pipe.py @@ -10,7 +10,8 @@ import cv2 as cv import h5py import numpy as np -from pyspark.sql import SparkSession, Row +from pyspark.sql import SparkSession, Row, DataFrame +import scipy as sp def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str, stacksize: int) -> np.ndarray: @@ -28,6 +29,8 @@ def __init__(self, spark: SparkSession, filetype: str = "hdf5"): self.spectrogram_pipe = self.spectrogram_pipe_hdf5 elif filetype == "shards": self.spectrogram_pipe = self.spectrogram_pipe_shards + elif filetype == "matfiles": + self.spectrogram_pipe = self.spectrogram_pipe_matfiles else: raise ValueError @@ -53,9 +56,42 @@ def metadata_pipe(self, metapath: Path, labels:list, return metadata + def spectrogram_pipe_matfiles(self, specpath: Path, labels:list, + default_size: tuple = (32, 130), + pad_value: float = 32.) \ + -> DataFrame: + """ + Loads spectrograms for each stack iteration from a set of mat files, + and turns it into a spark-friendly format. + + Args: + labels (list): List of target labels. + + Returns: + + """ + spectrograms = [] + row = {} + for label in labels: + matdata = sp.io.loadmat(specpath/label) + row["treatment"] = matdata["header"][0][0][4][0].lower() + row["label"] = label + spectrogram = np.array(matdata["SPF"][0]) + if len(spectrogram.shape) == 3: + spectrogram = spectrogram[0] + spectrogram = np.pad( + spectrogram, + ((default_size[0] - spectrogram.shape[0], 0), + (default_size[1] - spectrogram.shape[1], 0)), + mode="constant", constant_values=pad_value) + row["spectrogram"] = spectrogram.tolist() + spectrograms.append(Row(**row)) + + return self.spark.createDataFrame(spectrograms) + def spectrogram_pipe_hdf5(self, specpath: Path, labels: list, - namepattern:str="averaged_spectrogram{}.hdf5" - ) -> np.ndarray: + namepattern:str="averaged_spectrogram{}.hdf5") \ + -> DataFrame: """ Loads spectrograms for each stack iteration from an hdf5 data file, and turns it into a spark-friendly format. @@ -79,7 +115,8 @@ def spectrogram_pipe_hdf5(self, specpath: Path, labels: list, return self.spark.createDataFrame(spectrograms) def spectrogram_pipe_shards(self, specpath: Path, namepattern: str, - stacksize: int, freq_samples: int) -> np.ndarray: + stacksize: int, freq_samples: int) \ + -> DataFrame: """ Loads spectrograms for each stack iteration from a set of shard files, and turns it into a spark-friendly format. @@ -96,5 +133,4 @@ def spectrogram_pipe_shards(self, specpath: Path, namepattern: str, return - # EOF diff --git a/train_cpu.py b/train_cpu.py new file mode 100644 index 0000000..439da4d --- /dev/null +++ b/train_cpu.py @@ -0,0 +1,113 @@ +""" +train.py + +Launches the training process for the model. +""" + +import os +from pathlib import Path +import time + +#os.environ["KERAS_BACKEND"] = "jax" +#os.environ["XLA_PYTHON_CLIENT_PREALLOCATE"] = "false" + +import jax +import numpy as np +from pipe.pipe import SpectrogramPipe +from pyspark.ml.feature import StringIndexer +from pyspark.sql import SparkSession +import tensorflow as tf +import keras + +from jax.experimental import mesh_utils +from jax.sharding import Mesh +from jax.sharding import NamedSharding +from jax.sharding import PartitionSpec + +from model.autoencoder_smol import Autoencoder +from model.model import TimeSeriesTransformer as TSTF + +# data parameters +SPLIT = [0.98, 0.015, 0.05] + +# model parameters +HEAD_SIZE = 256 +NUM_HEADS = 4 +FF_DIM = 4 +NUM_TRANSFORMER_BLOCKS = 4 +MLP_UNITS = [128] +DROPOUT = 0.2 +MLP_DROPOUT = 0.3 + +# training parameters +BATCH_SIZE = 8 +EPOCHS = 25 + +def trim(dataframe, column): + + ndarray = np.array(dataframe.select(column).collect()) \ + .reshape(-1, 32, 130) + + return ndarray + +def get_data(spark, split=[0.99, 0.005, 0.005]): + path = Path("/app/workdir") + + labels = [] + with open(path / "train.csv", "r") as file: + for line in file: + labels.append(line.strip().split(",")[0]) + + pipe = SpectrogramPipe(spark, filetype="matfiles") + data = pipe.spectrogram_pipe(path, labels) + + indexer = StringIndexer(inputCol="treatment", outputCol="treatment_index") + indexed = indexer.fit(data).transform(data) + + train_df, validation_df, test_df = indexed.randomSplit(split, seed=42) + + trainx = trim(train_df, "spectrogram") + trainy = np.array(train_df.select("treatment_index").collect()) + + valx = trim(validation_df, "spectrogram") + valy = np.array(validation_df.select("treatment_index").collect()) + + testx = trim(test_df, "spectrogram") + testy = np.array(test_df.select("treatment_index").collect()) + + return ((trainx, trainy), (valx, valy), (testx, testy)) + +def get_model(input_shape, n_classes): + model = TSTF(input_shape, HEAD_SIZE, NUM_HEADS, FF_DIM, + NUM_TRANSFORMER_BLOCKS, MLP_UNITS, n_classes, + dropout=DROPOUT, mlp_dropout=MLP_DROPOUT) + return model + +def main(): + # jax mesh setup + + spark = SparkSession.builder.appName("train").getOrCreate() + + train_set, validation_set, test_set = get_data(spark, split=SPLIT) + + n_classes = len(set(train_set[1].flatten())) + model = get_model(train_set[0].shape[1:], n_classes) + model.compile(optimizer=keras.optimizers.Adam(), + loss="sparse_categorical_crossentropy", + metrics=["sparse_categorical_accuracy"] + ) + model.summary() + + start = time.time() + model.fit(x=train_set[0], y=train_set[1], + validation_data=(validation_set[0], validation_set[1]), + batch_size=BATCH_SIZE, epochs=EPOCHS) + end = time.time() + print("Training time: ", end - start) + + return + +if __name__ == "__main__": + main() + +# EOF