0
votes

I have very large datasets in spark dataframes that are distributed across the nodes. I can do simple statistics like mean, stdev, skewness, kurtosis etc using the spark libraries pyspark.sql.functions .

If I want to use advanced statistical tests like Jarque-Bera (JB) or Shapiro-Wilk(SW) etc, I use the python libraries like scipy since the standard apache pyspark libraries don't have them. But in order to do that, I have to convert the spark dataframe to pandas, which means forcing the data into the master node like so:

import scipy.stats as stats
pandas_df=spark_df.toPandas()
JBtest=stats.jarque_bera(pandas_df)
SWtest=stats.shapiro(pandas_df)

I have multiple features, and each feature ID corresponds to a dataset on which I want to perform the test statistic.

My question is:

Is there a way to apply these pythonic functions on a spark dataframe while the data is still distributed across the nodes, or do I need to create my own JB/SW test statistic functions in spark?

Thank you for any valuable insight

1

1 Answers

0
votes

Yous should be able to define a vectorized user-defined function that wraps the Pandas function (https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html), like this:

from pyspark.sql.functions import pandas_udf, PandasUDFType
import scipy.stats as stats

@pandas_udf('double', PandasUDFType.SCALAR)

def vector_jarque_bera(x):
    return stats.jarque_bera(x)

# test:
spark_df.withColumn('y', vector_jarque_bera(df['x']))

Note that the vectorized function column takes a column as its argument and returns a column.

(Nb. The @pandas_udf decorator is what transforms the Pandas function defined right below it into a vectorized function. Each element of the returned vector is itself a scalar, which is why the argument PandasUDFType.SCALAR is passed.)