From 75bf3fa11feb6a1c6ef55e6b7573193967351c75 Mon Sep 17 00:00:00 2001 From: maelstrom Date: Fri, 6 Dec 2024 19:21:29 -0500 Subject: [PATCH] Basic pipeline for spectrogram set up --- pipe/pipe.py | 75 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 18 deletions(-) diff --git a/pipe/pipe.py b/pipe/pipe.py index c791901..1840d81 100644 --- a/pipe/pipe.py +++ b/pipe/pipe.py @@ -7,33 +7,72 @@ import cv2 as cv import numpy as np -import pyspark +from pyspark.sql import SparkSession, Row -def image_pipe(imagepath: Path, namepattern: str, stacksize: int) - -> np.ndarray: +def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str, + stacksize: int) -> np.ndarray: images = np.zeros((stacksize, 800,800)) for i in range(stacksize): images[i,:,:] = cv.imread(imagepath/namepattern.format(i), -1) return images -def spectrogram_pipe(specpath: Path, namepattern: str, freq_samples: int) - -> np.ndarray: - """ - Loads spectrograms for each stack iteration from an hdf5 data file. - Args: - specpath (Path): Path to the spectrogram files. - namepattern (str): Name pattern for the spectrogram files. - stacksize (int): Number of spectrograms in the stack. - freq_samples (int): Number of frequency samples in each spectrogram. +class SpectrogramPipe: - Returns: - spectrogram (np.ndarray): Stack of spectrograms (2D array). - """ + def __init__(self, spark: SparkSession, filetype: str = "hdf5"): + self.spark = spark + if filetype == "hdf5": + self.spectrogram_pipe = self.spectrogram_pipe_hdf5 + elif filetype == "shards": + self.spectrogram_pipe = self.spectrogram_pipe_shards + else: + raise ValueError(s"Invalid filetype {filetype}.") - with h5py.File(specpath/namepattern.format(i), 'r') as f: - spectrogram = f['spectrogram'][:] + def spectrogram_pipe_hdf5(self, specpath: Path, freq_samples: int) + -> np.ndarray: + """ + Loads spectrograms for each stack iteration from an hdf5 data file, + and turns it into a spark-friendly format. + + Args: + specpath (Path): Path to the spectrogram files. + namepattern (str): Name pattern for the spectrogram files. + stacksize (int): Number of spectrograms in the stack. + freq_samples (int): Number of frequency samples in each + spectrogram. + + Returns: + """ + + spectrograms = [] + for filename in os.listdir(specpath): + if not filename.endswith(".hdf5"): + continue + with h5py.File(specpath/filename, 'r') as f: + spectrograms.append(Row(label=filename, + spectrogram=f['spectrogram'][:])) + + # Turn spectrogram into a spark dataframe. + + return spectrograms + + def spectrogram_pipe_shards(self, specpath: Path, namepattern: str, + stacksize: int, freq_samples: int) -> np.ndarray: + """ + Loads spectrograms for each stack iteration from a set of shard files, + and turns it into a spark-friendly format. + + Args: + specpath (Path): Path to the spectrogram files. + namepattern (str): Name pattern for the spectrogram files. + stacksize (int): Number of spectrograms in the stack. + freq_samples (int): Number of frequency samples in each + spectrogram. + + Returns: + """ + + return - return spectrogram # EOF