-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
126 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
inspect.py | ||
This module provides functionality to inspect the statistical characteristics | ||
of the input data to make better judgments on data transformations. | ||
""" | ||
|
||
import numpy as np | ||
from pyspark.sql.functions import col, udf | ||
from pyspark.sql.types import ArrayType, FloatType | ||
import pyspark | ||
|
||
from pipe.extract import extract | ||
from visualize.distributions import distribution_df | ||
|
||
def spectra_slice(arr): | ||
if arr is None: | ||
return None | ||
return [row[:130] for row in arr[:31]] | ||
|
||
def bb_slice(arr): | ||
if arr is None: | ||
return None | ||
return [row[130] for row in arr[:31]] | ||
|
||
def nsd_slice(arr): | ||
if arr is None: | ||
return None | ||
return [row[131] for row in arr[:31]] | ||
|
||
def ncnt_slice(arr): | ||
if arr is None: | ||
return None | ||
return [row[132] for row in arr[:31]] | ||
|
||
def unpack_timeseriesdata(dataframe): | ||
""" | ||
Takes the raw dataframe and unpacks the timeseries column to individual | ||
elements: spectra, backscatter brightness, n_count, and normalized standard | ||
deviation. These are then given as four separate columnar dataframes. | ||
Args: | ||
dataframe (pyspark.sql.DataFrame): Input dataframe with a 'timeseries' | ||
column. | ||
Returns: | ||
tuple: A tuple containing four pyspark.sql.DataFrame objects: | ||
- spectra: Power spectra portion of the data. | ||
- bb: Backscatter brightness portion of the data. | ||
- ncnt: Foreground pixel count portion of the data. | ||
- nsd: Normalized standard deviation portion of the data. | ||
""" | ||
|
||
spectra_slice_udf = udf(spectra_slice, ArrayType(ArrayType(FloatType()))) | ||
bb_slice_udf = udf(bb_slice, ArrayType(FloatType())) | ||
ncnt_slice_udf = udf(ncnt_slice, ArrayType(FloatType())) | ||
nsd_slice_udf = udf(nsd_slice, ArrayType(FloatType())) | ||
spectra = dataframe.select(spectra_slice_udf(col("timeseries")) \ | ||
.alias("spectra")) | ||
bb = dataframe.select(bb_slice_udf(col("timeseries")).alias("bb")) | ||
ncnt = dataframe.select(ncnt_slice_udf(col("timeseries")).alias("ncnt")) | ||
nsd = dataframe.select(nsd_slice_udf(col("timeseries")).alias("nsd")) | ||
|
||
return spectra, bb, ncnt, nsd | ||
|
||
def unpack_constants(dataframe): | ||
""" | ||
""" | ||
|
||
t0 = None | ||
tf = None | ||
dt = None | ||
f1 = None | ||
f2 = None | ||
|
||
return t0, tf, dt, f1, f2 | ||
|
||
if __name__ == "__main__": | ||
spark = pyspark.sql.SparkSession \ | ||
.builder \ | ||
.appName("DataInspection") \ | ||
.getOrCreate() | ||
|
||
# Load data | ||
data = extract(spark) | ||
labels = ["spectra", "backscatter_brightness", | ||
"n_count", "normalized_standard_deviation"] | ||
for dataframe, label in zip(unpack_timeseriesdata(data), labels): | ||
distribution_df(dataframe, label) | ||
t0, tf, dt, f1, f2 = unpack_constants(data) | ||
|
||
# EOF |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
# -*- coding: utf-8 -*- | ||
""" | ||
distribution.py | ||
This module provides functions to plot distributions of various numerical data. | ||
""" | ||
|
||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
from pyspark.sql.functions import col, explode, explode_outer | ||
import seaborn as sns | ||
|
||
def distribution_df(dataframe, label): | ||
name = dataframe.columns[0] | ||
dataframe = dataframe.limit(300) | ||
try: | ||
while True: | ||
dataframe = dataframe.select(explode(col(name)).alias(name)) | ||
except: | ||
#dataframe = dataframe.select(explode_outer(col(name)).alias(name)) | ||
pass | ||
|
||
print( | ||
"Average value of {} : {}".format( | ||
label, | ||
dataframe.agg({name: "avg"}).show() | ||
) | ||
) | ||
sns.histplot(data=dataframe.toPandas(), x=name, stat="density") | ||
plt.savefig(f"/app/workdir/figures/{name}_distribution.png") | ||
plt.close() | ||
|
||
# EOF |