diff --git a/model/model.py b/model/model.py index 5c65ab7..2ee911c 100644 --- a/model/model.py +++ b/model/model.py @@ -1,6 +1,9 @@ +# -*- coding: utf-8 -*- """ model.py +Contains the core model architecture for geonosis. + """ from keras import Input, Model @@ -8,7 +11,9 @@ GlobalAveragePooling1D, LayerNormalization, Masking, \ MultiHeadAttention -class TimeSeriesTransformer(Model): +from model.transformer import TimeseriesTransformerBuilder as TSTFBuilder + +class CompoundModel(Model): def __init__(self, input_shape, head_size, num_heads, ff_dim, num_Transformer_blocks, mlp_units, n_classes, @@ -35,7 +40,9 @@ def __init__(self, input_shape, head_size, num_heads, ff_dim, timeseriestransformer: Keras model, the TimeSeriesTransformer model. """ - super(TimeSeriesTransformer, self).__init__() + self.tstfbuilder = TSTFBuilder() + + super(CompoundModel, self).__init__() self.timeseriestransformer = self._modelstack( input_shape, head_size, num_heads, ff_dim, num_Transformer_blocks, mlp_units, n_classes, @@ -67,10 +74,16 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim, inputs = Input(shape=input_shape) #x = inputs + #inputs = Masking(mask_value=pad_value)(inputs) x = BatchNormalization()(inputs) for _ in range(num_Transformer_blocks): - x = self._transformerblocks(x, head_size, num_heads, ff_dim, - dropout) + x = self.tstfbuilder.build_transformerblock( + x, + head_size, + num_heads, + ff_dim, + dropout + ) x = GlobalAveragePooling1D(data_format="channels_first")(x) for dim in mlp_units: x = Dense(dim, activation="relu")(x) @@ -98,8 +111,6 @@ def _transformerblocks(self, inputs, head_size, num_heads, Returns: A model layer. """ - #pad_value = -1 - #inputs = Masking(mask_value=pad_value)(inputs) x = MultiHeadAttention( key_dim=head_size, num_heads=num_heads, dropout=dropout)(inputs, inputs) diff --git a/model/transformer.py b/model/transformer.py new file mode 100644 index 0000000..44e0a64 --- /dev/null +++ b/model/transformer.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +""" +model.py + +Contains the core model architecture for geonosis. + +""" + +from keras import Input, Model +from keras.layers import BatchNormalization, Conv1D, Dense, Dropout, Reshape, \ + GlobalAveragePooling1D, LayerNormalization, Masking, \ + MultiHeadAttention + +class TimeseriesTransformerBuilder: + + def __init__(self): + """ + Initializes the TimeSeriesTransformer class. This class is a + wrapper around a Keras model that consists of a series of + Transformer blocks followed by an MLP. + + Args: + input_shape: tuple, shape of the input tensor. + head_size: int, the number of features in each attention head. + num_heads: int, the number of attention heads. + ff_dim: int, the number of neurons in the feedforward neural + network. + num_Transformer_blocks: int, the number of Transformer blocks. + mlp_units: list of ints, the number of neurons in each layer of + the MLP. + n_classes: int, the number of output classes. + dropout: float, dropout rate. + mlp_dropout: float, dropout rate in the MLP. + + Attributes: + timeseriestransformer: Keras model, the TimeSeriesTransformer + model. + """ + + def build_transformerblock(self, inputs, head_size, num_heads, + ff_dim, dropout): + """ + Constructs the transformer block. This consists of multi-head + attention, dropout, layer normalization, a residual connection, + a feedforward neural network, and another residual connection. + + Args: + inputs: Tensor, batch of input data. + head_size: int, the number of features in each attention head. + num_heads: int, the number of attention heads. + ff_dim: int, the number of neurons in the feedforward neural + network. + dropout: float, dropout rate. + + Returns: + A model layer. + """ + x = MultiHeadAttention( + key_dim=head_size, num_heads=num_heads, + dropout=dropout)(inputs, inputs) + x = Dropout(dropout)(x) + x = LayerNormalization(epsilon=1e-6)(x) + res = x + inputs + + x = Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res) + x = Dropout(dropout)(x) + x = Conv1D(filters=inputs.shape[-1], kernel_size=1)(x) + outputs = Dropout(dropout)(x) + res + + return outputs + + def call(self, inputs): + """ + Calls the TimeSeriesTransformer model on a batch of inputs. + + Args: + inputs: Tensor, batch of input data. + + Returns: + Tensor, resulting output of the TimeSeriesTransformer model. + """ + return self.timeseriestransformer(inputs) + + def summary(self): + """ + Prints a summary of the TimeSeriesTransformer model. + + Args: + None. + + Returns: + None. + """ + self.timeseriestransformer.summary() + +# EOF diff --git a/pipe/etl.py b/pipe/etl.py index 4525775..d83e4df 100644 --- a/pipe/etl.py +++ b/pipe/etl.py @@ -72,7 +72,7 @@ def split_sets(data: DataFrame, split=[0.99, 0.005, 0.005]) -> tuple: key: build_dict(data, key) for key in ["treatment", "target"] } splits = data.randomSplit(split, seed=42) - trainx, valx, testx = (trim(dset, "spectra") for dset in splits) + trainx, valx, testx = (trim(dset, "timeseries") for dset in splits) trainy, valy, testy = ( [ np.array(dset.select("treatment").collect()).squeeze(), @@ -85,7 +85,7 @@ def split_sets(data: DataFrame, split=[0.99, 0.005, 0.005]) -> tuple: def trim(dataframe, column): ndarray = np.array(dataframe.select(column).collect()) \ - .reshape(-1, 32, 130) + .reshape(-1, 32, 133) return ndarray diff --git a/pipe/extract.py b/pipe/extract.py index 42b674c..4d2c1e6 100644 --- a/pipe/extract.py +++ b/pipe/extract.py @@ -124,39 +124,59 @@ def read_matfiles(self, specpath: Path, data = [] row = {} labels = glob.glob(str(specpath/"matfiles"/"*.mat")) + nloops = default_size[0] + nfreq = default_size[1] + mean = 0 + maxv = 0 + minv = 0 + spec_meanshift = 0. + spec_scale = 4. + bb_meanshift = 10. + bb_scale = 5. + nsd_meanshift = 0.5 + ncnt_scale = 5. for label in labels: matdata = sp.io.loadmat(specpath/"matfiles"/label) + header = matdata["header"] + constants_array = np.zeros((1,5), dtype="float16") + constants_array[0,0] = header["f1"][0][0][0] + constants_array[0,1] = header["f2"][0][0][0] + constants_array[0,2] = header["t0"][0][0][0] + constants_array[0,4] = header["dt"][0][0][0] + timeseries_array = np.zeros((nloops, nfreq+3), dtype="float16") + spec = matdata["SP"] + for _ in range(len(spec.shape)-2): + spec = spec[0] + spec[np.abs(spec) == np.inf] = pad_value + spec[np.isnan(spec)] = pad_value + try: + constants_array[0,3] = header["tf"][0][0][0] + except ValueError: + constants_array[0,3] = float(spec.shape[-2]) + mean += np.mean(spec) + maxv += max(np.max(spec), maxv) + minv += min(np.min(spec), minv) + time_offset = int(nloops-constants_array[0,3]) + timeseries_array[time_offset:, :130] = \ + (spec - spec_meanshift) / spec_scale + timeseries_array[:time_offset, :130] = \ + timeseries_array[time_offset, :130] + timeseries_array[time_offset:, 130] = \ + (np.log10(matdata["BB"][0]) - bb_meanshift) / bb_scale + timeseries_array[time_offset:, 131] = \ + (matdata["NSD"][0] - nsd_meanshift) + timeseries_array[time_offset:, 132] = \ + np.log10(matdata["NCNT"][0]) / ncnt_scale + row["timeseries"] = timeseries_array.tolist() + if DataKind.TREATMENT in datakinds: - row["treatment"] = matdata["header"][0][0][4][0].lower() + row["treatment"] = matdata["header"]["drug"][0][0][0].lower() if DataKind.TARGET in datakinds: try: - row["target"] = matdata["header"][0][0][2][0].lower() + row["target"] = matdata["header"]["cell"][0][0][0].lower() except: row["target"] = "unknown" - if DataKind.FPS in datakinds: - row["fps"] = 2*float(matdata["header"][0][0][15][0]) - if DataKind.BB in datakinds: - row["bb"] = matdata["BB"].tolist() - if DataKind.NSD in datakinds: - row["nsd"] = matdata["NSD"].tolist() - if DataKind.NCNT in datakinds: - row["ncnt"] = matdata["NCNT"].astype(float).tolist() - if DataKind.SPEC in datakinds: - spectra = np.array(matdata["SP"][0]) - if len(spectra.shape) == 3: - spectra = spectra[0] - if spectra.shape[0] > default_size[0]: - spectra = spectra[:default_size[0], :] - spectra = np.pad( - spectra, - ((default_size[0] - spectra.shape[0], 0), - (default_size[1] - spectra.shape[1], 0)), - mode="constant", constant_values=pad_value) - spectra[np.isnan(spectra)] = 0. - spectra[np.abs(spectra) == np.inf] = 0. - spectra = spectra / np.sum(spectra) - row["spectra"] = spectra.tolist() data.append(Row(**row)) return self.spark.createDataFrame(data) diff --git a/pipe/transform.py b/pipe/transform.py index a66461b..0c4637e 100644 --- a/pipe/transform.py +++ b/pipe/transform.py @@ -82,6 +82,7 @@ def onehot(dataframe: DataFrame, keys: list) -> DataFrame: return result + def transform(spark: SparkSession, dataframe: DataFrame, keys: list) \ -> DataFrame: """ diff --git a/train.py b/train.py index 55f0b69..60ca8c8 100644 --- a/train.py +++ b/train.py @@ -23,7 +23,7 @@ from sklearn.metrics import confusion_matrix from sklearn.preprocessing import OneHotEncoder -from model.model import TimeSeriesTransformer as TSTF +from model.model import CompoundModel from pipe.etl import etl, read with open("parameters.json", "r") as file: @@ -55,7 +55,7 @@ def trim(dataframe, column): return ndarray def get_model(input_shape, n_classes): - model = TSTF(input_shape, HEAD_SIZE, NUM_HEADS, FF_DIM, + model = CompoundModel(input_shape, HEAD_SIZE, NUM_HEADS, FF_DIM, NUM_TRANSFORMER_BLOCKS, MLP_UNITS, n_classes, dropout=DROPOUT, mlp_dropout=MLP_DROPOUT) return model