diff --git a/pipe/etl.py b/pipe/etl.py index 4dc3892..d355981 100644 --- a/pipe/etl.py +++ b/pipe/etl.py @@ -12,41 +12,20 @@ from pathlib import Path from pyspark.sql import SparkSession, functions, types, Row from sklearn.metrics import confusion_matrix -from sklearn.preprocessing import OneHotEncoder import tensorflow as tf -from pipe.pipe import SpectrogramPipe +from pipe.extract import extract +from pipe.transform import transform +from pipe.load import load -def transform(spark, dataframe, keys): - dataframe.select("treatment").replace("virus", "cpv") \ - .replace("cont", "pbs") \ - .replace("control", "pbs") \ - .replace("dld", "pbs").distinct() - - dataframe = dataframe.withColumn( - "index", functions.monotonically_increasing_id() - ) - bundle = {key: [ - arr.tolist() - for arr in OneHotEncoder(sparse_output=False) \ - .fit_transform(dataframe.select(key).collect()) - ] for key in keys - } - - bundle = [dict(zip(bundle.keys(), values)) - for values in zip(*bundle.values())] - schema = types.StructType([ - types.StructField(key, types.ArrayType(types.FloatType()), True) - for key in keys - ]) - newframe = spark.createDataFrame(bundle, schema=schema).withColumn( - "index", functions.monotonically_increasing_id() - ) - for key in keys: - dataframe = dataframe.withColumnRenamed(key, f"{key}_str") - dataframe = dataframe.join(newframe, on="index", how="inner") - - return dataframe +def etl(spark): + """ + Performs the ETL process in series. + """ + data = extract(spark) + data = transform(spark, data, keys=["treatment", "target"]) + data = load(data) + return data def build_dict(df, key): """ @@ -67,38 +46,6 @@ def trim(dataframe, column): return ndarray -def extract(spark): - """ - First step of the ETL pipeline. It reads the list of .mat files from - a CSV list, opens and pulls the spectrogram from each respective file. - - """ - - path = Path("/app/workdir") - labels = [] - with open(path / "train.csv", "r") as file: - for line in file: - labels.append(line.strip().split(",")[0]) - - pipe = SpectrogramPipe(spark, filetype="matfiles") - - return pipe.spectrogram_pipe(path, labels) - -def load(data, split=[0.99, 0.005, 0.005]): - category_dict = { - key: build_dict(data, key) for key in ["treatment", "target"] - } - splits = data.randomSplit(split, seed=42) - trainx, valx, testx = (trim(dset, "spectrogram") for dset in splits) - trainy, valy, testy = ( - [ - np.array(dset.select("treatment").collect()).squeeze(), - np.array(dset.select("target").collect()).squeeze() - ] for dset in splits - ) - - return ((trainx, trainy), (valx, valy), (testx, testy), category_dict) - def visualize_data_distribution(data): for category in ["treatment", "target"]: select = data.select(category) \ diff --git a/pipe/pipe.py b/pipe/extract.py similarity index 82% rename from pipe/pipe.py rename to pipe/extract.py index fb24f0a..218fc55 100644 --- a/pipe/pipe.py +++ b/pipe/extract.py @@ -1,5 +1,6 @@ +#-*- coding: utf-8 -*- """ -pipe.py +extract.py """ import json @@ -13,6 +14,23 @@ from pyspark.sql import SparkSession, Row, DataFrame import scipy as sp +def extract(spark): + """ + First step of the ETL pipeline. It reads the list of .mat files from + a CSV list, opens and pulls the spectrogram from each respective file. + + """ + + path = Path("/app/workdir") + labels = [] + with open(path / "train.csv", "r") as file: + for line in file: + labels.append(line.strip().split(",")[0]) + + reader = SpectrogramReader(spark, filetype="matfiles") + + return spectrogram_read(path, labels) + def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str, stacksize: int) -> np.ndarray: images = np.zeros((stacksize, 800,800)) @@ -21,20 +39,20 @@ def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str, return images -class SpectrogramPipe: +class SpectrogramReader: def __init__(self, spark: SparkSession, filetype: str = "hdf5"): self.spark = spark if filetype == "hdf5": - self.spectrogram_pipe = self.spectrogram_pipe_hdf5 + self.spectrogram_read = self.spectrogram_read_hdf5 elif filetype == "shards": - self.spectrogram_pipe = self.spectrogram_pipe_shards + self.spectrogram_read = self.spectrogram_read_shards elif filetype == "matfiles": - self.spectrogram_pipe = self.spectrogram_pipe_matfiles + self.spectrogram_read = self.spectrogram_read_matfiles else: raise ValueError - def metadata_pipe(self, metapath: Path, labels:list, + def metadata_read(self, metapath: Path, labels:list, namepattern: str="metadata{}.json") -> dict: """ Loads metadata for each target label from a set of json files and @@ -56,7 +74,7 @@ def metadata_pipe(self, metapath: Path, labels:list, return metadata - def spectrogram_pipe_matfiles(self, specpath: Path, labels:list, + def spectrogram_read_matfiles(self, specpath: Path, labels:list, default_size: tuple = (32, 130), pad_value: float = 0.) \ -> DataFrame: @@ -98,7 +116,7 @@ def spectrogram_pipe_matfiles(self, specpath: Path, labels:list, return self.spark.createDataFrame(spectrograms) - def spectrogram_pipe_hdf5(self, specpath: Path, labels: list, + def spectrogram_read_hdf5(self, specpath: Path, labels: list, namepattern:str="averaged_spectrogram{}.hdf5") \ -> DataFrame: """ @@ -123,7 +141,7 @@ def spectrogram_pipe_hdf5(self, specpath: Path, labels: list, return self.spark.createDataFrame(spectrograms) - def spectrogram_pipe_shards(self, specpath: Path, namepattern: str, + def spectrogram_read_shards(self, specpath: Path, namepattern: str, stacksize: int, freq_samples: int) \ -> DataFrame: """ diff --git a/pipe/load.py b/pipe/load.py new file mode 100644 index 0000000..cf7a9ad --- /dev/null +++ b/pipe/load.py @@ -0,0 +1,25 @@ +#-*- coding: utf-8 -*- +""" +load.py +""" + +import typing + +import numpy as np +from pyspark.sql import DataFrame + +def load(data: DataFrame, split=[0.99, 0.005, 0.005]): + category_dict = { + key: build_dict(data, key) for key in ["treatment", "target"] + } + splits = data.randomSplit(split, seed=42) + trainx, valx, testx = (trim(dset, "spectrogram") for dset in splits) + trainy, valy, testy = ( + [ + np.array(dset.select("treatment").collect()).squeeze(), + np.array(dset.select("target").collect()).squeeze() + ] for dset in splits + ) + + return ((trainx, trainy), (valx, valy), (testx, testy), category_dict) + diff --git a/pipe/transform.py b/pipe/transform.py new file mode 100644 index 0000000..8604c60 --- /dev/null +++ b/pipe/transform.py @@ -0,0 +1,52 @@ +#-*- coding: utf-8 -*- +""" +transform.py +""" + +import typing + +from pyspark.sql import DataFrame, SparkSession +from sklearn.preprocessing import OneHotEncoder + +def merge_redundant_labels(dataframe: DataFrame) -> DataFrame: + """ + Merge redundant labels in the 'treatment' column of the dataframe. This + step is inefficient but necessary due to inconsistent naming conventions + used in the MATLAB onekey processing. + """ + dataframe.select("treatment").replace("virus", "cpv") \ + .replace("cont", "pbs") \ + .replace("control", "pbs") \ + .replace("dld", "pbs").distinct() + return dataframe + +def transform(spark: SparkSession, dataframe: DataFrame, keys: list) \ + -> DataFrame: + """ + """ + dataframe = merge_redundant_labels(dataframe) + dataframe = dataframe.withColumn( + "index", functions.monotonically_increasing_id() + ) + bundle = {key: [ + arr.tolist() + for arr in OneHotEncoder(sparse_output=False) \ + .fit_transform(dataframe.select(key).collect()) + ] for key in keys + } + + bundle = [dict(zip(bundle.keys(), values)) + for values in zip(*bundle.values())] + schema = types.StructType([ + types.StructField(key, types.ArrayType(types.FloatType()), True) + for key in keys + ]) + newframe = spark.createDataFrame(bundle, schema=schema).withColumn( + "index", functions.monotonically_increasing_id() + ) + + for key in keys: + dataframe = dataframe.withColumnRenamed(key, f"{key}_str") + dataframe = dataframe.join(newframe, on="index", how="inner") + + return dataframe