From 8f31b154b379f9e18a4074f014a68362949c2d0b Mon Sep 17 00:00:00 2001 From: Dawith Lim Date: Sun, 19 Oct 2025 02:12:45 -0400 Subject: [PATCH] data distribution probe code added --- inspect_data.py | 93 ++++++++++++++++++++++++++++++++++++++ visualize/distributions.py | 33 ++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 inspect_data.py create mode 100644 visualize/distributions.py diff --git a/inspect_data.py b/inspect_data.py new file mode 100644 index 0000000..274b138 --- /dev/null +++ b/inspect_data.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +""" +inspect.py + +This module provides functionality to inspect the statistical characteristics +of the input data to make better judgments on data transformations. +""" + +import numpy as np +from pyspark.sql.functions import col, udf +from pyspark.sql.types import ArrayType, FloatType +import pyspark + +from pipe.extract import extract +from visualize.distributions import distribution_df + +def spectra_slice(arr): + if arr is None: + return None + return [row[:130] for row in arr[:31]] + +def bb_slice(arr): + if arr is None: + return None + return [row[130] for row in arr[:31]] + +def nsd_slice(arr): + if arr is None: + return None + return [row[131] for row in arr[:31]] + +def ncnt_slice(arr): + if arr is None: + return None + return [row[132] for row in arr[:31]] + +def unpack_timeseriesdata(dataframe): + """ + Takes the raw dataframe and unpacks the timeseries column to individual + elements: spectra, backscatter brightness, n_count, and normalized standard + deviation. These are then given as four separate columnar dataframes. + + Args: + dataframe (pyspark.sql.DataFrame): Input dataframe with a 'timeseries' + column. + + Returns: + tuple: A tuple containing four pyspark.sql.DataFrame objects: + - spectra: Power spectra portion of the data. + - bb: Backscatter brightness portion of the data. + - ncnt: Foreground pixel count portion of the data. + - nsd: Normalized standard deviation portion of the data. + """ + + spectra_slice_udf = udf(spectra_slice, ArrayType(ArrayType(FloatType()))) + bb_slice_udf = udf(bb_slice, ArrayType(FloatType())) + ncnt_slice_udf = udf(ncnt_slice, ArrayType(FloatType())) + nsd_slice_udf = udf(nsd_slice, ArrayType(FloatType())) + spectra = dataframe.select(spectra_slice_udf(col("timeseries")) \ + .alias("spectra")) + bb = dataframe.select(bb_slice_udf(col("timeseries")).alias("bb")) + ncnt = dataframe.select(ncnt_slice_udf(col("timeseries")).alias("ncnt")) + nsd = dataframe.select(nsd_slice_udf(col("timeseries")).alias("nsd")) + + return spectra, bb, ncnt, nsd + +def unpack_constants(dataframe): + """ + """ + + t0 = None + tf = None + dt = None + f1 = None + f2 = None + + return t0, tf, dt, f1, f2 + +if __name__ == "__main__": + spark = pyspark.sql.SparkSession \ + .builder \ + .appName("DataInspection") \ + .getOrCreate() + + # Load data + data = extract(spark) + labels = ["spectra", "backscatter_brightness", + "n_count", "normalized_standard_deviation"] + for dataframe, label in zip(unpack_timeseriesdata(data), labels): + distribution_df(dataframe, label) + t0, tf, dt, f1, f2 = unpack_constants(data) + +# EOF diff --git a/visualize/distributions.py b/visualize/distributions.py new file mode 100644 index 0000000..0180b28 --- /dev/null +++ b/visualize/distributions.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +""" +distribution.py + +This module provides functions to plot distributions of various numerical data. +""" + +import matplotlib.pyplot as plt +import numpy as np +from pyspark.sql.functions import col, explode, explode_outer +import seaborn as sns + +def distribution_df(dataframe, label): + name = dataframe.columns[0] + dataframe = dataframe.limit(300) + try: + while True: + dataframe = dataframe.select(explode(col(name)).alias(name)) + except: + #dataframe = dataframe.select(explode_outer(col(name)).alias(name)) + pass + + print( + "Average value of {} : {}".format( + label, + dataframe.agg({name: "avg"}).show() + ) + ) + sns.histplot(data=dataframe.toPandas(), x=name, stat="density") + plt.savefig(f"/app/workdir/figures/{name}_distribution.png") + plt.close() + +# EOF