diff --git a/pipe/etl.py b/pipe/etl.py index d355981..807d427 100644 --- a/pipe/etl.py +++ b/pipe/etl.py @@ -27,25 +27,6 @@ def etl(spark): data = load(data) return data -def build_dict(df, key): - """ - Takes a dataframe as input and returns a dictionary of unique values - in the column corresponding to the key. - """ - - df = df.select(key, f"{key}_str").distinct() - - return df.rdd.map( - lambda row: (str(np.argmax(row[key])), row[f"{key}_str"]) - ).collectAsMap() - -def trim(dataframe, column): - - ndarray = np.array(dataframe.select(column).collect()) \ - .reshape(-1, 32, 130) - - return ndarray - def visualize_data_distribution(data): for category in ["treatment", "target"]: select = data.select(category) \ diff --git a/pipe/extract.py b/pipe/extract.py index 218fc55..9415ee9 100644 --- a/pipe/extract.py +++ b/pipe/extract.py @@ -29,7 +29,7 @@ def extract(spark): reader = SpectrogramReader(spark, filetype="matfiles") - return spectrogram_read(path, labels) + return reader.spectrogram_read(path, labels) def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str, stacksize: int) -> np.ndarray: diff --git a/pipe/load.py b/pipe/load.py index cf7a9ad..5d12919 100644 --- a/pipe/load.py +++ b/pipe/load.py @@ -8,6 +8,26 @@ import numpy as np from pyspark.sql import DataFrame + +def build_dict(df, key): + """ + Takes a dataframe as input and returns a dictionary of unique values + in the column corresponding to the key. + """ + + df = df.select(key, f"{key}_str").distinct() + + return df.rdd.map( + lambda row: (str(np.argmax(row[key])), row[f"{key}_str"]) + ).collectAsMap() + +def trim(dataframe, column): + + ndarray = np.array(dataframe.select(column).collect()) \ + .reshape(-1, 32, 130) + + return ndarray + def load(data: DataFrame, split=[0.99, 0.005, 0.005]): category_dict = { key: build_dict(data, key) for key in ["treatment", "target"] @@ -23,3 +43,4 @@ def load(data: DataFrame, split=[0.99, 0.005, 0.005]): return ((trainx, trainy), (valx, valy), (testx, testy), category_dict) +# EOF