Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions evaluate/evaluate_autoencoder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-

# Built-in module imports

# 3rd party module imports

# Local module imports

if __name__ == "__main__":
pass

# EOF
20 changes: 20 additions & 0 deletions logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-

# Built-in module import
import logging
import subprocess

def init_logger(path, pid, name="logger"):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
filehandler = logging.FileHandler(path/pid/"model.log", mode="a+")
filehandler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s %(name)s - %(message)s"
)
filehandler.setFormatter(formatter)
logger.addHandler(filehandler)
git_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]) \
.decode("ascii").strip()
logger.info(f"Git hash: {git_hash}")
return logger
2 changes: 1 addition & 1 deletion model/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim,

inputs = Input(shape=(mlp_units[-1],), name="decoder_input")
full_dimension = input_shape[0] * input_shape[1]
x = Dense(full_dimension, activation="relu", name="dec_dense1")(inputs)
x = Dense(full_dimension, activation="gelu", name="dec_dense1")(inputs)
x = Reshape((input_shape[0], input_shape[1]), name="dec_reshape")(x)

for i in range(num_Transformer_blocks):
Expand Down
2 changes: 1 addition & 1 deletion model/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim,
# Pooling and simple DNN block
x = GlobalAveragePooling1D(data_format="channels_first")(x)
for dim in mlp_units:
x = Dense(dim, activation="relu")(x)
x = Dense(dim, activation="gelu")(x)
x = Dropout(mlp_dropout)(x)
x = LayerNormalization()(x)

Expand Down
10 changes: 5 additions & 5 deletions model/latent.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def _build(self, mlp_dim, semantic_dims, var_dim):
sem_inputs = [Input(shape=(dim,)) for dim in semantic_dims]
s, s_err = self.semantic([inputs] + sem_inputs)
v, kl_loss = self.variational(inputs)
x = Dense(mlp_dim, activation="relu")(ops.concatenate([s, v], axis=-1))
x = Dense(mlp_dim, activation="gelu")(ops.concatenate([s, v], axis=-1))

return Model([inputs, *sem_inputs], [x, s_err, kl_loss], name="latent")

Expand Down Expand Up @@ -58,7 +58,7 @@ def _build(self, mlp_dim, semantic_dims):

# Compute inverse log of dimensions to get weights
class_counts = ops.array(semantic_dims[:-1], dtype="float32")
weights = 1/ops.log(class_counts)
weights = 1/(2*ops.log(class_counts))

inputs = Input(shape=(mlp_dim,))
targets = [Input(shape=(dim,)) for dim in semantic_dims]
Expand All @@ -81,12 +81,12 @@ def _build(self, mlp_dim, semantic_dims):
]

# Compute MAE, with normalization by approximate range of values (~4)
errors.append(mean_absolute_error(targets[-1], reg) / 4.)
errors.append(mean_absolute_error(targets[-1], reg) / 8.)
error = ops.sum(ops.stack(errors))

# Combine everything into single dense layer
concat = keras.layers.concatenate(one_hots + [reg], axis=-1)
output = Dense(mlp_dim, activation="relu")(concat)
output = Dense(mlp_dim, activation="gelu")(concat)

return Model([inputs, *targets], [output, error], name="semantic")

Expand All @@ -104,7 +104,7 @@ def __init__(self, dim, var_dim):

def _build(self, dim, var_dim):
inputs = Input(shape=(dim,))
x = Dense(var_dim, activation="relu")(inputs)
x = Dense(var_dim, activation="gelu")(inputs)
z_mean = Dense(var_dim, activation=None)(x)
z_log_var = Dense(var_dim, activation=None)(x)
z = Sampling()([z_mean, z_log_var])
Expand Down
4 changes: 2 additions & 2 deletions model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, input_shape, head_size, num_heads, ff_dim,
self.cycle_len = 25
self.total_epoch = None
self.max_cap = 5.0
self.beta = 0.05
self.beta = 0.1

self.encoder = Encoder(
input_shape,
Expand Down Expand Up @@ -117,7 +117,7 @@ def call(self, inputs, training: Optional[bool] = None):
training=training
)

self.recon_weight = ops.cast(1./4., dtype=recon_loss.dtype)
self.recon_weight = ops.cast(1., dtype=recon_loss.dtype)
self.add_loss(sem_loss)
self.add_loss(self.beta*kl_loss)
self.add_loss(recon_loss * self.recon_weight)
Expand Down
2 changes: 1 addition & 1 deletion model/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def build_transformerblock(self, inputs, head_size, num_heads,
x = LayerNormalization(epsilon=1e-6)(x)
res = x + inputs

x = Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
x = Conv1D(filters=ff_dim, kernel_size=1, activation="gelu")(res)
x = Dropout(dropout)(x)
x = Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
outputs = Dropout(dropout)(x) + res
Expand Down
4 changes: 2 additions & 2 deletions pipe/enumsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
from typing import Any, List, Sequence

class Thresholds(Enum):
NCNT = 2
NCNT = 2.5
BB = 3
NSD = 0.3
NSD = 0.35

class StrEnum(str, Enum):
def __str__(self) -> str:
Expand Down
4 changes: 2 additions & 2 deletions pipe/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pipe.load import load
from pipe.transform import transform

def etl(spark: SparkSession, split: list=None) -> DataFrame:
def etl(spark: SparkSession, split: list=None, dsetsize=None) -> DataFrame:
"""
Performs the ETL process in series and returns the final DataFrame.
Expand All @@ -31,7 +31,7 @@ def etl(spark: SparkSession, split: list=None) -> DataFrame:
Returns:
types.DataFrame: The final processed DataFrame after ETL.
"""
data = extract(spark)
data = extract(spark, dsetsize=dsetsize)
data = transform(spark, data)
load(data)
match split:
Expand Down
90 changes: 70 additions & 20 deletions pipe/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
"""

# Built-in module imports
from datetime import datetime
import json
import glob
import os
from pathlib import Path
import shutil as sh
import typing
from typing import List

Expand All @@ -22,10 +24,11 @@
import scipy as sp

# Local module imports
from logger import init_logger
from pipe.enumsets import Data, FileType, Thresholds
from pipe.parse_utils import get_field, strip_array

def extract(spark: SparkSession) -> DataFrame:
def extract(spark: SparkSession, dsetsize=500) -> DataFrame:
"""
First step of the ETL pipeline. It reads the list of .mat files from
a CSV list, opens and pulls the spectrogram from each respective file.
Expand All @@ -35,10 +38,20 @@ def extract(spark: SparkSession) -> DataFrame:
"""

path = Path("/app/workdir")

reader = FileReader(spark, filetype=FileType.MAT)
strfmt = "%Y%m%d_%H%M%S"
timestamp = datetime.now().strftime(strfmt)
name = f"dataset_{dsetsize}_{timestamp}"
try:
os.mkdir(path/name)
except FileExistsError:
print("File Exists")
exit()
logger = init_logger(path, name, "data_logger")
logger.info(f"Number of files to process: {dsetsize}")

reader = FileReader(spark, logger, filetype=FileType.MAT)
rdd = spark.sparkContext.parallelize(
reader.read_file(path),
reader.read_file(path, name, dsetsize=dsetsize),
numSlices=200
)
#return reader.read_file(path, labels)
Expand All @@ -55,8 +68,9 @@ class FileReader:
'shards', and 'matfiles'.
"""

def __init__(self, spark: SparkSession, filetype: FileType):
def __init__(self, spark: SparkSession, logger, filetype: FileType):
self.spark = spark
self.logger = logger
match filetype:
case FileType.HDF5:
self.read_file = self.read_hdf5
Expand Down Expand Up @@ -89,7 +103,7 @@ def metadata_read(self, metapath: Path, labels:list,

return metadata

def parse_matfile(self, matdata: dict):
def parse_matfile(self, matdata: dict, label: str):
"""
Parses the data from a .mat file and returns it as a dictionary.
Expand Down Expand Up @@ -138,15 +152,23 @@ def parse_matfile(self, matdata: dict):
nsd_max = np.max(data_dict["timeseries"][:,-1])
if np.isnan(data_dict["timeseries"]).any():
print("Skipping file due to NaN values in timeseries.")
sh.move(f"/app/workdir/matfiles/{label}",
f"/app/workdir/matfiles/bad/nan/{label}")
return {}
if bb_max < Thresholds.BB.value:
print(f"Skipping file due to low BB: {bb_max}.")
sh.move(f"/app/workdir/matfiles/{label}",
f"/app/workdir/matfiles/bad/bb/{label}")
return {}
if ncnt_max < Thresholds.NCNT.value:
print(f"Skipping file due to low NCNT: {ncnt_max}.")
sh.move(f"/app/workdir/matfiles/{label}",
f"/app/workdir/matfiles/bad/ncnt/{label}")
return {}
if nsd_max < Thresholds.NSD.value:
print(f"Skipping file due to low NSD: {nsd_max}.")
sh.move(f"/app/workdir/matfiles/{label}",
f"/app/workdir/matfiles/bad/nsd/{label}")
return {}
if data_dict["constants"].shape != (5,):
print("Unexpected shape for constants:",
Expand All @@ -160,6 +182,8 @@ def parse_matfile(self, matdata: dict):


def read_matfiles(self, specpath: Path,
name: str,
dsetsize: int = 0,
default_size: tuple = (36, 130),
pad_value: float = 0.) -> DataFrame:
"""
Expand All @@ -177,11 +201,12 @@ def read_matfiles(self, specpath: Path,
DataFrame: Spark DataFrame containing the requested data.
"""
labels = glob.glob(str(specpath/"matfiles"/"*.mat"))
if dsetsize > 0:
labels = np.random.choice(labels, size=dsetsize, replace=False)
np.savetxt(specpath/name/f"{name}_manifest.csv", labels, fmt="%s")
nloops = default_size[0]
nfreq = default_size[1]
mean = 0
maxv = 0
minv = 0

spec_meanshift = 0.
spec_scale = 4.
bb_meanshift = 10.
Expand All @@ -190,9 +215,11 @@ def read_matfiles(self, specpath: Path,
ncnt_scale = 5.

for label in labels:
matdata = sp.io.loadmat(specpath/"matfiles"/label)
row = self.parse_matfile(matdata)
matdata = sp.io.loadmat(label)
label = label.split("/")[-1]
row = self.parse_matfile(matdata, label)
if row == {}:
print(label)
continue
timeseries_array = np.zeros((nloops, nfreq+3), dtype="float16")
baseline = int(row["constants"][2])
Expand All @@ -213,9 +240,6 @@ def read_matfiles(self, specpath: Path,
row["timeseries"] = timeseries_array
del timeseries_array

mean = np.mean(row["timeseries"][:,:-3])
maxv = max(row["timeseries"][:,:-3].max(), maxv)
minv = min(row["timeseries"][:,:-3].min(), minv)
row["timeseries"][:,:-3] = (
row["timeseries"][:,:-3] - spec_meanshift
) / spec_scale
Expand All @@ -230,16 +254,42 @@ def read_matfiles(self, specpath: Path,

row["timeseries"][:, -1] = row["timeseries"][:, -1] - nsd_meanshift

timeseries_array = Vectors.dense(row["timeseries"].flatten())
row["timeseries"] = timeseries_array

# Rescale the constants
row["constants"][0] = np.log10(row["constants"][0]) / 5.
row["constants"][1] = np.log10(row["constants"][1]) / 5.
row["constants"][2] = np.log10(row["constants"][2]) / nloops - 0.5
row["constants"][3] = np.log10(row["constants"][3]) / nloops - 0.5
row["constants"][4] = (np.log10(row["constants"][4]) / 60.) - 0.5
row["constants"][2] = row["constants"][2] / nloops - 0.5
row["constants"][3] = row["constants"][3] / nloops - 0.5
row["constants"][4] = (row["constants"][4] / 60.) - 0.5

# Final NaN check
if np.isnan(row["timeseries"]).any():
print(label)
print("Skipping file due to NaN.")
sh.move(f"/app/workdir/matfiles/{label}",
f"/app/workdir/matfiles/bad/nan/{label}")
continue
if np.isinf(row["timeseries"]).any():
print(label)
print("Skipping file due to INF.")
sh.move(f"/app/workdir/matfiles/{label}",
f"/app/workdir/matfiles/bad/inf/{label}")
continue
for const in row["constants"]:
if np.isnan(const):
print(label)
print(row["constants"])
print("Skipping file due to NaN in constants.")
continue
if np.isinf(const):
print(label)
print(row["constants"])
print("Skipping file due to INF in constants.")
continue

timeseries_array = Vectors.dense(row["timeseries"].flatten())
row["timeseries"] = timeseries_array
row["constants"] = Vectors.dense(row["constants"])

yield row

#return self.spark.createDataFrame(data)
Expand Down
Loading