Skip to content

Commit

Permalink
Multiple data streams integrated together in an intermediate fusion p…
Browse files Browse the repository at this point in the history
…ipeline
  • Loading branch information
lim185 committed Oct 19, 2025
1 parent 8f31b15 commit c7cff3c
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 35 deletions.
23 changes: 17 additions & 6 deletions model/model.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
# -*- coding: utf-8 -*-
"""
model.py
Contains the core model architecture for geonosis.
"""

from keras import Input, Model
from keras.layers import BatchNormalization, Conv1D, Dense, Dropout, Reshape, \
GlobalAveragePooling1D, LayerNormalization, Masking, \
MultiHeadAttention

class TimeSeriesTransformer(Model):
from model.transformer import TimeseriesTransformerBuilder as TSTFBuilder

class CompoundModel(Model):

def __init__(self, input_shape, head_size, num_heads, ff_dim,
num_Transformer_blocks, mlp_units, n_classes,
Expand All @@ -35,7 +40,9 @@ def __init__(self, input_shape, head_size, num_heads, ff_dim,
timeseriestransformer: Keras model, the TimeSeriesTransformer
model.
"""
super(TimeSeriesTransformer, self).__init__()
self.tstfbuilder = TSTFBuilder()

super(CompoundModel, self).__init__()
self.timeseriestransformer = self._modelstack(
input_shape, head_size, num_heads, ff_dim,
num_Transformer_blocks, mlp_units, n_classes,
Expand Down Expand Up @@ -67,10 +74,16 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim,

inputs = Input(shape=input_shape)
#x = inputs
#inputs = Masking(mask_value=pad_value)(inputs)
x = BatchNormalization()(inputs)
for _ in range(num_Transformer_blocks):
x = self._transformerblocks(x, head_size, num_heads, ff_dim,
dropout)
x = self.tstfbuilder.build_transformerblock(
x,
head_size,
num_heads,
ff_dim,
dropout
)
x = GlobalAveragePooling1D(data_format="channels_first")(x)
for dim in mlp_units:
x = Dense(dim, activation="relu")(x)
Expand Down Expand Up @@ -98,8 +111,6 @@ def _transformerblocks(self, inputs, head_size, num_heads,
Returns:
A model layer.
"""
#pad_value = -1
#inputs = Masking(mask_value=pad_value)(inputs)
x = MultiHeadAttention(
key_dim=head_size, num_heads=num_heads,
dropout=dropout)(inputs, inputs)
Expand Down
96 changes: 96 additions & 0 deletions model/transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# -*- coding: utf-8 -*-
"""
model.py
Contains the core model architecture for geonosis.
"""

from keras import Input, Model
from keras.layers import BatchNormalization, Conv1D, Dense, Dropout, Reshape, \
GlobalAveragePooling1D, LayerNormalization, Masking, \
MultiHeadAttention

class TimeseriesTransformerBuilder:

def __init__(self):
"""
Initializes the TimeSeriesTransformer class. This class is a
wrapper around a Keras model that consists of a series of
Transformer blocks followed by an MLP.
Args:
input_shape: tuple, shape of the input tensor.
head_size: int, the number of features in each attention head.
num_heads: int, the number of attention heads.
ff_dim: int, the number of neurons in the feedforward neural
network.
num_Transformer_blocks: int, the number of Transformer blocks.
mlp_units: list of ints, the number of neurons in each layer of
the MLP.
n_classes: int, the number of output classes.
dropout: float, dropout rate.
mlp_dropout: float, dropout rate in the MLP.
Attributes:
timeseriestransformer: Keras model, the TimeSeriesTransformer
model.
"""

def build_transformerblock(self, inputs, head_size, num_heads,
ff_dim, dropout):
"""
Constructs the transformer block. This consists of multi-head
attention, dropout, layer normalization, a residual connection,
a feedforward neural network, and another residual connection.
Args:
inputs: Tensor, batch of input data.
head_size: int, the number of features in each attention head.
num_heads: int, the number of attention heads.
ff_dim: int, the number of neurons in the feedforward neural
network.
dropout: float, dropout rate.
Returns:
A model layer.
"""
x = MultiHeadAttention(
key_dim=head_size, num_heads=num_heads,
dropout=dropout)(inputs, inputs)
x = Dropout(dropout)(x)
x = LayerNormalization(epsilon=1e-6)(x)
res = x + inputs

x = Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
x = Dropout(dropout)(x)
x = Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
outputs = Dropout(dropout)(x) + res

return outputs

def call(self, inputs):
"""
Calls the TimeSeriesTransformer model on a batch of inputs.
Args:
inputs: Tensor, batch of input data.
Returns:
Tensor, resulting output of the TimeSeriesTransformer model.
"""
return self.timeseriestransformer(inputs)

def summary(self):
"""
Prints a summary of the TimeSeriesTransformer model.
Args:
None.
Returns:
None.
"""
self.timeseriestransformer.summary()

# EOF
4 changes: 2 additions & 2 deletions pipe/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def split_sets(data: DataFrame, split=[0.99, 0.005, 0.005]) -> tuple:
key: build_dict(data, key) for key in ["treatment", "target"]
}
splits = data.randomSplit(split, seed=42)
trainx, valx, testx = (trim(dset, "spectra") for dset in splits)
trainx, valx, testx = (trim(dset, "timeseries") for dset in splits)
trainy, valy, testy = (
[
np.array(dset.select("treatment").collect()).squeeze(),
Expand All @@ -85,7 +85,7 @@ def split_sets(data: DataFrame, split=[0.99, 0.005, 0.005]) -> tuple:
def trim(dataframe, column):

ndarray = np.array(dataframe.select(column).collect()) \
.reshape(-1, 32, 130)
.reshape(-1, 32, 133)

return ndarray

Expand Down
70 changes: 45 additions & 25 deletions pipe/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,39 +124,59 @@ def read_matfiles(self, specpath: Path,
data = []
row = {}
labels = glob.glob(str(specpath/"matfiles"/"*.mat"))
nloops = default_size[0]
nfreq = default_size[1]
mean = 0
maxv = 0
minv = 0
spec_meanshift = 0.
spec_scale = 4.
bb_meanshift = 10.
bb_scale = 5.
nsd_meanshift = 0.5
ncnt_scale = 5.

for label in labels:
matdata = sp.io.loadmat(specpath/"matfiles"/label)
header = matdata["header"]
constants_array = np.zeros((1,5), dtype="float16")
constants_array[0,0] = header["f1"][0][0][0]
constants_array[0,1] = header["f2"][0][0][0]
constants_array[0,2] = header["t0"][0][0][0]
constants_array[0,4] = header["dt"][0][0][0]
timeseries_array = np.zeros((nloops, nfreq+3), dtype="float16")
spec = matdata["SP"]
for _ in range(len(spec.shape)-2):
spec = spec[0]
spec[np.abs(spec) == np.inf] = pad_value
spec[np.isnan(spec)] = pad_value
try:
constants_array[0,3] = header["tf"][0][0][0]
except ValueError:
constants_array[0,3] = float(spec.shape[-2])
mean += np.mean(spec)
maxv += max(np.max(spec), maxv)
minv += min(np.min(spec), minv)
time_offset = int(nloops-constants_array[0,3])
timeseries_array[time_offset:, :130] = \
(spec - spec_meanshift) / spec_scale
timeseries_array[:time_offset, :130] = \
timeseries_array[time_offset, :130]
timeseries_array[time_offset:, 130] = \
(np.log10(matdata["BB"][0]) - bb_meanshift) / bb_scale
timeseries_array[time_offset:, 131] = \
(matdata["NSD"][0] - nsd_meanshift)
timeseries_array[time_offset:, 132] = \
np.log10(matdata["NCNT"][0]) / ncnt_scale
row["timeseries"] = timeseries_array.tolist()

if DataKind.TREATMENT in datakinds:
row["treatment"] = matdata["header"][0][0][4][0].lower()
row["treatment"] = matdata["header"]["drug"][0][0][0].lower()
if DataKind.TARGET in datakinds:
try:
row["target"] = matdata["header"][0][0][2][0].lower()
row["target"] = matdata["header"]["cell"][0][0][0].lower()
except:
row["target"] = "unknown"
if DataKind.FPS in datakinds:
row["fps"] = 2*float(matdata["header"][0][0][15][0])
if DataKind.BB in datakinds:
row["bb"] = matdata["BB"].tolist()
if DataKind.NSD in datakinds:
row["nsd"] = matdata["NSD"].tolist()
if DataKind.NCNT in datakinds:
row["ncnt"] = matdata["NCNT"].astype(float).tolist()
if DataKind.SPEC in datakinds:
spectra = np.array(matdata["SP"][0])
if len(spectra.shape) == 3:
spectra = spectra[0]
if spectra.shape[0] > default_size[0]:
spectra = spectra[:default_size[0], :]
spectra = np.pad(
spectra,
((default_size[0] - spectra.shape[0], 0),
(default_size[1] - spectra.shape[1], 0)),
mode="constant", constant_values=pad_value)
spectra[np.isnan(spectra)] = 0.
spectra[np.abs(spectra) == np.inf] = 0.
spectra = spectra / np.sum(spectra)
row["spectra"] = spectra.tolist()
data.append(Row(**row))

return self.spark.createDataFrame(data)
Expand Down
1 change: 1 addition & 0 deletions pipe/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def onehot(dataframe: DataFrame, keys: list) -> DataFrame:

return result


def transform(spark: SparkSession, dataframe: DataFrame, keys: list) \
-> DataFrame:
"""
Expand Down
4 changes: 2 additions & 2 deletions train.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import OneHotEncoder

from model.model import TimeSeriesTransformer as TSTF
from model.model import CompoundModel
from pipe.etl import etl, read

with open("parameters.json", "r") as file:
Expand Down Expand Up @@ -55,7 +55,7 @@ def trim(dataframe, column):
return ndarray

def get_model(input_shape, n_classes):
model = TSTF(input_shape, HEAD_SIZE, NUM_HEADS, FF_DIM,
model = CompoundModel(input_shape, HEAD_SIZE, NUM_HEADS, FF_DIM,
NUM_TRANSFORMER_BLOCKS, MLP_UNITS, n_classes,
dropout=DROPOUT, mlp_dropout=MLP_DROPOUT)
return model
Expand Down

0 comments on commit c7cff3c

Please sign in to comment.