diff --git a/evaluate/evaluate_autoencoder.py b/evaluate/evaluate_autoencoder.py new file mode 100644 index 0000000..085270b --- /dev/null +++ b/evaluate/evaluate_autoencoder.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- + +# Built-in module imports + +# 3rd party module imports + +# Local module imports + +if __name__ == "__main__": + pass + +# EOF diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..40d3977 --- /dev/null +++ b/logger.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- + +# Built-in module import +import logging +import subprocess + +def init_logger(path, pid, name="logger"): + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + filehandler = logging.FileHandler(path/pid/"model.log", mode="a+") + filehandler.setLevel(logging.INFO) + formatter = logging.Formatter( + "%(asctime)s - %(levelname)s %(name)s - %(message)s" + ) + filehandler.setFormatter(formatter) + logger.addHandler(filehandler) + git_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]) \ + .decode("ascii").strip() + logger.info(f"Git hash: {git_hash}") + return logger diff --git a/model/decoder.py b/model/decoder.py index 506a12b..69907bc 100644 --- a/model/decoder.py +++ b/model/decoder.py @@ -68,7 +68,7 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim, inputs = Input(shape=(mlp_units[-1],), name="decoder_input") full_dimension = input_shape[0] * input_shape[1] - x = Dense(full_dimension, activation="relu", name="dec_dense1")(inputs) + x = Dense(full_dimension, activation="gelu", name="dec_dense1")(inputs) x = Reshape((input_shape[0], input_shape[1]), name="dec_reshape")(x) for i in range(num_Transformer_blocks): diff --git a/model/encoder.py b/model/encoder.py index 4c5a510..236c155 100644 --- a/model/encoder.py +++ b/model/encoder.py @@ -86,7 +86,7 @@ def _modelstack(self, input_shape, head_size, num_heads, ff_dim, # Pooling and simple DNN block x = GlobalAveragePooling1D(data_format="channels_first")(x) for dim in mlp_units: - x = Dense(dim, activation="relu")(x) + x = Dense(dim, activation="gelu")(x) x = Dropout(mlp_dropout)(x) x = LayerNormalization()(x) diff --git a/model/latent.py b/model/latent.py index a5d0721..456e903 100644 --- a/model/latent.py +++ b/model/latent.py @@ -25,7 +25,7 @@ def _build(self, mlp_dim, semantic_dims, var_dim): sem_inputs = [Input(shape=(dim,)) for dim in semantic_dims] s, s_err = self.semantic([inputs] + sem_inputs) v, kl_loss = self.variational(inputs) - x = Dense(mlp_dim, activation="relu")(ops.concatenate([s, v], axis=-1)) + x = Dense(mlp_dim, activation="gelu")(ops.concatenate([s, v], axis=-1)) return Model([inputs, *sem_inputs], [x, s_err, kl_loss], name="latent") @@ -58,7 +58,7 @@ def _build(self, mlp_dim, semantic_dims): # Compute inverse log of dimensions to get weights class_counts = ops.array(semantic_dims[:-1], dtype="float32") - weights = 1/ops.log(class_counts) + weights = 1/(2*ops.log(class_counts)) inputs = Input(shape=(mlp_dim,)) targets = [Input(shape=(dim,)) for dim in semantic_dims] @@ -81,12 +81,12 @@ def _build(self, mlp_dim, semantic_dims): ] # Compute MAE, with normalization by approximate range of values (~4) - errors.append(mean_absolute_error(targets[-1], reg) / 4.) + errors.append(mean_absolute_error(targets[-1], reg) / 8.) error = ops.sum(ops.stack(errors)) # Combine everything into single dense layer concat = keras.layers.concatenate(one_hots + [reg], axis=-1) - output = Dense(mlp_dim, activation="relu")(concat) + output = Dense(mlp_dim, activation="gelu")(concat) return Model([inputs, *targets], [output, error], name="semantic") @@ -104,7 +104,7 @@ def __init__(self, dim, var_dim): def _build(self, dim, var_dim): inputs = Input(shape=(dim,)) - x = Dense(var_dim, activation="relu")(inputs) + x = Dense(var_dim, activation="gelu")(inputs) z_mean = Dense(var_dim, activation=None)(x) z_log_var = Dense(var_dim, activation=None)(x) z = Sampling()([z_mean, z_log_var]) diff --git a/model/model.py b/model/model.py index 2080402..2ee60e1 100644 --- a/model/model.py +++ b/model/model.py @@ -50,7 +50,7 @@ def __init__(self, input_shape, head_size, num_heads, ff_dim, self.cycle_len = 25 self.total_epoch = None self.max_cap = 5.0 - self.beta = 0.05 + self.beta = 0.1 self.encoder = Encoder( input_shape, @@ -117,7 +117,7 @@ def call(self, inputs, training: Optional[bool] = None): training=training ) - self.recon_weight = ops.cast(1./4., dtype=recon_loss.dtype) + self.recon_weight = ops.cast(1., dtype=recon_loss.dtype) self.add_loss(sem_loss) self.add_loss(self.beta*kl_loss) self.add_loss(recon_loss * self.recon_weight) diff --git a/model/transformer.py b/model/transformer.py index e3de135..4f9b34e 100644 --- a/model/transformer.py +++ b/model/transformer.py @@ -67,7 +67,7 @@ def build_transformerblock(self, inputs, head_size, num_heads, x = LayerNormalization(epsilon=1e-6)(x) res = x + inputs - x = Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res) + x = Conv1D(filters=ff_dim, kernel_size=1, activation="gelu")(res) x = Dropout(dropout)(x) x = Conv1D(filters=inputs.shape[-1], kernel_size=1)(x) outputs = Dropout(dropout)(x) + res diff --git a/pipe/enumsets.py b/pipe/enumsets.py index 3161877..552bab1 100644 --- a/pipe/enumsets.py +++ b/pipe/enumsets.py @@ -9,9 +9,9 @@ from typing import Any, List, Sequence class Thresholds(Enum): - NCNT = 2 + NCNT = 2.5 BB = 3 - NSD = 0.3 + NSD = 0.35 class StrEnum(str, Enum): def __str__(self) -> str: diff --git a/pipe/etl.py b/pipe/etl.py index 2c0f937..7576a5c 100644 --- a/pipe/etl.py +++ b/pipe/etl.py @@ -21,7 +21,7 @@ from pipe.load import load from pipe.transform import transform -def etl(spark: SparkSession, split: list=None) -> DataFrame: +def etl(spark: SparkSession, split: list=None, dsetsize=None) -> DataFrame: """ Performs the ETL process in series and returns the final DataFrame. @@ -31,7 +31,7 @@ def etl(spark: SparkSession, split: list=None) -> DataFrame: Returns: types.DataFrame: The final processed DataFrame after ETL. """ - data = extract(spark) + data = extract(spark, dsetsize=dsetsize) data = transform(spark, data) load(data) match split: diff --git a/pipe/extract.py b/pipe/extract.py index 660d5f2..b69efb8 100644 --- a/pipe/extract.py +++ b/pipe/extract.py @@ -4,10 +4,12 @@ """ # Built-in module imports +from datetime import datetime import json import glob import os from pathlib import Path +import shutil as sh import typing from typing import List @@ -22,10 +24,11 @@ import scipy as sp # Local module imports +from logger import init_logger from pipe.enumsets import Data, FileType, Thresholds from pipe.parse_utils import get_field, strip_array -def extract(spark: SparkSession) -> DataFrame: +def extract(spark: SparkSession, dsetsize=500) -> DataFrame: """ First step of the ETL pipeline. It reads the list of .mat files from a CSV list, opens and pulls the spectrogram from each respective file. @@ -35,10 +38,20 @@ def extract(spark: SparkSession) -> DataFrame: """ path = Path("/app/workdir") - - reader = FileReader(spark, filetype=FileType.MAT) + strfmt = "%Y%m%d_%H%M%S" + timestamp = datetime.now().strftime(strfmt) + name = f"dataset_{dsetsize}_{timestamp}" + try: + os.mkdir(path/name) + except FileExistsError: + print("File Exists") + exit() + logger = init_logger(path, name, "data_logger") + logger.info(f"Number of files to process: {dsetsize}") + + reader = FileReader(spark, logger, filetype=FileType.MAT) rdd = spark.sparkContext.parallelize( - reader.read_file(path), + reader.read_file(path, name, dsetsize=dsetsize), numSlices=200 ) #return reader.read_file(path, labels) @@ -55,8 +68,9 @@ class FileReader: 'shards', and 'matfiles'. """ - def __init__(self, spark: SparkSession, filetype: FileType): + def __init__(self, spark: SparkSession, logger, filetype: FileType): self.spark = spark + self.logger = logger match filetype: case FileType.HDF5: self.read_file = self.read_hdf5 @@ -89,7 +103,7 @@ def metadata_read(self, metapath: Path, labels:list, return metadata - def parse_matfile(self, matdata: dict): + def parse_matfile(self, matdata: dict, label: str): """ Parses the data from a .mat file and returns it as a dictionary. @@ -138,15 +152,23 @@ def parse_matfile(self, matdata: dict): nsd_max = np.max(data_dict["timeseries"][:,-1]) if np.isnan(data_dict["timeseries"]).any(): print("Skipping file due to NaN values in timeseries.") + sh.move(f"/app/workdir/matfiles/{label}", + f"/app/workdir/matfiles/bad/nan/{label}") return {} if bb_max < Thresholds.BB.value: print(f"Skipping file due to low BB: {bb_max}.") + sh.move(f"/app/workdir/matfiles/{label}", + f"/app/workdir/matfiles/bad/bb/{label}") return {} if ncnt_max < Thresholds.NCNT.value: print(f"Skipping file due to low NCNT: {ncnt_max}.") + sh.move(f"/app/workdir/matfiles/{label}", + f"/app/workdir/matfiles/bad/ncnt/{label}") return {} if nsd_max < Thresholds.NSD.value: print(f"Skipping file due to low NSD: {nsd_max}.") + sh.move(f"/app/workdir/matfiles/{label}", + f"/app/workdir/matfiles/bad/nsd/{label}") return {} if data_dict["constants"].shape != (5,): print("Unexpected shape for constants:", @@ -160,6 +182,8 @@ def parse_matfile(self, matdata: dict): def read_matfiles(self, specpath: Path, + name: str, + dsetsize: int = 0, default_size: tuple = (36, 130), pad_value: float = 0.) -> DataFrame: """ @@ -177,11 +201,12 @@ def read_matfiles(self, specpath: Path, DataFrame: Spark DataFrame containing the requested data. """ labels = glob.glob(str(specpath/"matfiles"/"*.mat")) + if dsetsize > 0: + labels = np.random.choice(labels, size=dsetsize, replace=False) + np.savetxt(specpath/name/f"{name}_manifest.csv", labels, fmt="%s") nloops = default_size[0] nfreq = default_size[1] - mean = 0 - maxv = 0 - minv = 0 + spec_meanshift = 0. spec_scale = 4. bb_meanshift = 10. @@ -190,9 +215,11 @@ def read_matfiles(self, specpath: Path, ncnt_scale = 5. for label in labels: - matdata = sp.io.loadmat(specpath/"matfiles"/label) - row = self.parse_matfile(matdata) + matdata = sp.io.loadmat(label) + label = label.split("/")[-1] + row = self.parse_matfile(matdata, label) if row == {}: + print(label) continue timeseries_array = np.zeros((nloops, nfreq+3), dtype="float16") baseline = int(row["constants"][2]) @@ -213,9 +240,6 @@ def read_matfiles(self, specpath: Path, row["timeseries"] = timeseries_array del timeseries_array - mean = np.mean(row["timeseries"][:,:-3]) - maxv = max(row["timeseries"][:,:-3].max(), maxv) - minv = min(row["timeseries"][:,:-3].min(), minv) row["timeseries"][:,:-3] = ( row["timeseries"][:,:-3] - spec_meanshift ) / spec_scale @@ -230,16 +254,42 @@ def read_matfiles(self, specpath: Path, row["timeseries"][:, -1] = row["timeseries"][:, -1] - nsd_meanshift - timeseries_array = Vectors.dense(row["timeseries"].flatten()) - row["timeseries"] = timeseries_array - # Rescale the constants row["constants"][0] = np.log10(row["constants"][0]) / 5. row["constants"][1] = np.log10(row["constants"][1]) / 5. - row["constants"][2] = np.log10(row["constants"][2]) / nloops - 0.5 - row["constants"][3] = np.log10(row["constants"][3]) / nloops - 0.5 - row["constants"][4] = (np.log10(row["constants"][4]) / 60.) - 0.5 + row["constants"][2] = row["constants"][2] / nloops - 0.5 + row["constants"][3] = row["constants"][3] / nloops - 0.5 + row["constants"][4] = (row["constants"][4] / 60.) - 0.5 + + # Final NaN check + if np.isnan(row["timeseries"]).any(): + print(label) + print("Skipping file due to NaN.") + sh.move(f"/app/workdir/matfiles/{label}", + f"/app/workdir/matfiles/bad/nan/{label}") + continue + if np.isinf(row["timeseries"]).any(): + print(label) + print("Skipping file due to INF.") + sh.move(f"/app/workdir/matfiles/{label}", + f"/app/workdir/matfiles/bad/inf/{label}") + continue + for const in row["constants"]: + if np.isnan(const): + print(label) + print(row["constants"]) + print("Skipping file due to NaN in constants.") + continue + if np.isinf(const): + print(label) + print(row["constants"]) + print("Skipping file due to INF in constants.") + continue + + timeseries_array = Vectors.dense(row["timeseries"].flatten()) + row["timeseries"] = timeseries_array row["constants"] = Vectors.dense(row["constants"]) + yield row #return self.spark.createDataFrame(data) diff --git a/train/autoencoder_train.py b/train/autoencoder_train.py index fb64c2f..51eb7a5 100644 --- a/train/autoencoder_train.py +++ b/train/autoencoder_train.py @@ -24,10 +24,13 @@ def autoencoder_workflow(params, shape, n_classes, train_set, validation_set, test_set, - categories, keys, path): + categories, keys, path, logger): model = build_autoencoder(params, shape, n_classes) - model = train_autoencoder(params, model, train_set, validation_set, path) + model, runtime = train_autoencoder( + params, model, train_set, validation_set, path + ) + logger.info(f"Autoencoder training completed in {runtime:.2f} seconds.") m = {key: None for key in keys} m, test_predict = test_autoencoder( @@ -40,7 +43,7 @@ def autoencoder_workflow(params, shape, n_classes, evaluate_autoencoder( params, test_predict, - test_set[0], + test_set, categories, keys, @@ -68,7 +71,7 @@ def build_autoencoder(params, shape, semantic_dims): model.total_epoch = params["epochs"] model.build(shape) model.compile( - optimizer=keras.optimizers.Adam(learning_rate=4e-4), + optimizer=keras.optimizers.Adam(learning_rate=5e-5), metrics=params["metrics"] ) @@ -78,6 +81,11 @@ def train_autoencoder(params, model, train_set, validation_set, path): log_level = params["log_level"] timestamp = params["timestamp"] params = params["autoencoder_params"] + print("NAN", np.isnan(train_set[0]).any()) + print("INF", np.isinf(train_set[0]).any()) + for dset in train_set[1]: + print("NAN", np.isnan(dset).any()) + print("INF", np.isinf(dset).any()) callbacks = [ EpochTracker(), ModelCheckpoint( @@ -102,8 +110,9 @@ def train_autoencoder(params, model, train_set, validation_set, path): callbacks=callbacks ) end = time.time() - print("Training time: ", end - start) - return model + runtime = end - start + print("Training time: ", runtime) + return model, runtime def test_autoencoder(model: Model, test: List, metrics: dict): """ @@ -122,11 +131,32 @@ def test_autoencoder(model: Model, test: List, metrics: dict): return metrics, test_predict def evaluate_autoencoder(params, test_predict, test_set, categories, keys, path): - plt.pcolor(test_set[0]) + tf = test_set[1][-1][0][3] + t0 = test_set[1][-1][0][2] + test_set = test_set[0] + diff = int(36*(tf - t0)) + baseline = 36 - diff + og = test_set[0] + np.savetxt(path / params["timestamp"] / "original.csv", og, delimiter=",") + np.savetxt(path / params["timestamp"] / "reconstruction.csv", test_predict, + delimiter=",") + # Render the original spectra and spectrogram + plt.pcolor(og, cmap="bwr", vmin=-1, vmax=1) + plt.title("Original") plt.savefig(path / params["timestamp"] / "original.png") plt.close() - plt.pcolor(test_predict) - plt.savefig(path / params["timestamp"] / "reproduction.png") + plt.pcolor(og - np.mean(og[:baseline], axis=0), cmap="bwr", vmin=-1, vmax=1) + plt.title("Original_Sgram") + plt.savefig(path / params["timestamp"] / "original_sgram.png") + plt.close() + plt.pcolor(test_predict, cmap="bwr", vmin=-1, vmax=1) + plt.title("Reconstruction") + plt.savefig(path / params["timestamp"] / "reconstruction.png") + plt.close() + plt.pcolor(test_predict - np.mean(test_predict[:baseline], axis=0), + cmap="bwr", vmin=-1, vmax=1) + plt.title("Reconstruction_Sgram") + plt.savefig(path / params["timestamp"] / "reconstruction_sgram.png") plt.close() return diff --git a/train_model.py b/train_model.py index 5cc3035..0db731c 100644 --- a/train_model.py +++ b/train_model.py @@ -7,7 +7,6 @@ # Built-in module imports from datetime import datetime -import logging import os from pathlib import Path import shutil as sh @@ -24,6 +23,7 @@ import matplotlib.pyplot as plt # Local module imports +from logger import init_logger from pipe.etl import etl, read from train.autoencoder_train import autoencoder_workflow @@ -64,16 +64,15 @@ def data_parallel(): return data_parallel -def logger(): - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(levelname)s %(name)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" - ) - return logging.getLogger(__name__) - def main(): - logger = logger() + strfmt = "%Y%m%d_%H%M%S" + timestamp = datetime.now().strftime(strfmt) + path = Path("/app/workdir/model") + try: + os.mkdir(path/timestamp) + except FileExistsError: + exit() + logger = init_logger(path, timestamp, "model_logger") params = parameters() spark = SparkSession.builder \ @@ -87,7 +86,8 @@ def main(): #keras.distribution.set_distribution(parallel) if params["load_from_scratch"]: - data = etl(spark, split=params["autoencoder_params"]["split"]) + data = etl(spark, split=params["autoencoder_params"]["split"], + dsetsize=params["dsetsize"]) else: data = read(spark, split=params["autoencoder_params"]["split"], parquet=params["dataset"]) @@ -96,12 +96,8 @@ def main(): n_classes = [dset.shape[1] for dset in train_set[1]] shape = train_set[0].shape[1:] - path = Path("/app/workdir/model") - strfmt = "%Y%m%d_%H%M%S" - timestamp = datetime.now().strftime(strfmt) params["timestamp"] = timestamp - os.mkdir(path/timestamp) sh.copy("parameters.json", path/timestamp/"parameters.json") if params["train_autoencoder"]: @@ -114,7 +110,8 @@ def main(): test_set, categories, keys, - path + path, + logger ) return