diff --git a/inspect_data.py b/inspect_data.py index 274b138..80b398e 100644 --- a/inspect_data.py +++ b/inspect_data.py @@ -7,7 +7,7 @@ """ import numpy as np -from pyspark.sql.functions import col, udf +from pyspark.sql.functions import col, udf, array, lit from pyspark.sql.types import ArrayType, FloatType import pyspark @@ -64,6 +64,10 @@ def unpack_timeseriesdata(dataframe): return spectra, bb, ncnt, nsd +@udf(FloatType()) +def cos_sim(a, b): + return float(np.dot(a,b) / (np.linalg.norm(a) * np.linalg.norm(b))) + def unpack_constants(dataframe): """ """ @@ -76,6 +80,23 @@ def unpack_constants(dataframe): return t0, tf, dt, f1, f2 +def cosine_similarity(dataframe, column): + dataframe.orderBy(column) + df_cos_sim = dataframe.select([column, "timeseries"]) + df_cos_sim = df_cos_sim.withColumnRenamed("timeseries", "timeseries0") \ + .crossJoin(df_cos_sim) + df_cos_sim = df_cos_sim.withColumn( + "dot_product", + lit( + np.dot(col("timeseries"), col("timeseries0")) + / (np.linalg.norm(col("timeseries")) + * np.linalg.norm(col("timeseries0"))) + ) + ) + print(df_cos_sim.head(0)) + + return + if __name__ == "__main__": spark = pyspark.sql.SparkSession \ .builder \ @@ -84,6 +105,8 @@ def unpack_constants(dataframe): # Load data data = extract(spark) + cosine_similarity(data, "treatment") + exit() labels = ["spectra", "backscatter_brightness", "n_count", "normalized_standard_deviation"] for dataframe, label in zip(unpack_timeseriesdata(data), labels):