Skip to content

Commit

Permalink
Basic pipeline for spectrogram set up
Browse files Browse the repository at this point in the history
  • Loading branch information
lim185 committed Dec 7, 2024
1 parent 971044e commit 75bf3fa
Showing 1 changed file with 57 additions and 18 deletions.
75 changes: 57 additions & 18 deletions pipe/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,72 @@

import cv2 as cv
import numpy as np
import pyspark
from pyspark.sql import SparkSession, Row

def image_pipe(imagepath: Path, namepattern: str, stacksize: int)
-> np.ndarray:
def image_pipe(spark: SparkSession, imagepath: Path, namepattern: str,
stacksize: int) -> np.ndarray:
images = np.zeros((stacksize, 800,800))
for i in range(stacksize):
images[i,:,:] = cv.imread(imagepath/namepattern.format(i), -1)

return images

def spectrogram_pipe(specpath: Path, namepattern: str, freq_samples: int)
-> np.ndarray:
"""
Loads spectrograms for each stack iteration from an hdf5 data file.
Args:
specpath (Path): Path to the spectrogram files.
namepattern (str): Name pattern for the spectrogram files.
stacksize (int): Number of spectrograms in the stack.
freq_samples (int): Number of frequency samples in each spectrogram.
class SpectrogramPipe:

Returns:
spectrogram (np.ndarray): Stack of spectrograms (2D array).
"""
def __init__(self, spark: SparkSession, filetype: str = "hdf5"):
self.spark = spark
if filetype == "hdf5":
self.spectrogram_pipe = self.spectrogram_pipe_hdf5
elif filetype == "shards":
self.spectrogram_pipe = self.spectrogram_pipe_shards
else:
raise ValueError(s"Invalid filetype {filetype}.")

with h5py.File(specpath/namepattern.format(i), 'r') as f:
spectrogram = f['spectrogram'][:]
def spectrogram_pipe_hdf5(self, specpath: Path, freq_samples: int)
-> np.ndarray:
"""
Loads spectrograms for each stack iteration from an hdf5 data file,
and turns it into a spark-friendly format.
Args:
specpath (Path): Path to the spectrogram files.
namepattern (str): Name pattern for the spectrogram files.
stacksize (int): Number of spectrograms in the stack.
freq_samples (int): Number of frequency samples in each
spectrogram.
Returns:
"""

spectrograms = []
for filename in os.listdir(specpath):
if not filename.endswith(".hdf5"):
continue
with h5py.File(specpath/filename, 'r') as f:
spectrograms.append(Row(label=filename,
spectrogram=f['spectrogram'][:]))

# Turn spectrogram into a spark dataframe.

return spectrograms

def spectrogram_pipe_shards(self, specpath: Path, namepattern: str,
stacksize: int, freq_samples: int) -> np.ndarray:
"""
Loads spectrograms for each stack iteration from a set of shard files,
and turns it into a spark-friendly format.
Args:
specpath (Path): Path to the spectrogram files.
namepattern (str): Name pattern for the spectrogram files.
stacksize (int): Number of spectrograms in the stack.
freq_samples (int): Number of frequency samples in each
spectrogram.
Returns:
"""

return

return spectrogram

# EOF

0 comments on commit 75bf3fa

Please sign in to comment.