3
votes

This answer nicely explains how to use pyspark's groupby and pandas_udf to do custom aggregations. However, I cannot possibly declare my schema manually as shown in this part of the example

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

since I will be returning 100+ columns with names that are automatically generated. Is there any way to tell PySpark to just implicetely use the Schema returned by my function and assume it's going to be the same for all worker nodes? This schema will also change during runs since I will have to play around with the predictors I want to use, so an automated process for Schema generation might be an option...

3
May be this example approach can help you.Sanxofon

3 Answers

2
votes

Based on Sanxofons comment, I got an idea on how to implement this myself:

from pyspark.sql.types import *

mapping = {"float64": DoubleType,
           "object":StringType,
           "int64":IntegerType} # Incomplete - extend with your types.

def createUDFSchemaFromPandas(dfp):
  column_types  = [StructField(key, mapping[str(dfp.dtypes[key])]()) for key in dfp.columns]
  schema = StructType(column_types)
  return schema

What I do is get a sample pandas df, pass it to the function, and see what returns:

dfp = df_total.limit(100).toPandas()
df_return = my_UDF_function(dfp)
schema = createUDFSchemaFromPandas(df_return)

This seems to work for me. The problem is that it is kind of recursive (need to define the function to get the schema, have the schema to define as udf). I solved this by creating a "wrapper" UDF that simply passes the dataframe.

1
votes

Unfortunately there is no such option. Schema must be known statically before any component is evaluated, so any form inference, based on actual data, is simply not on the table.

If internal process is somehow based on code generation, your best option is to integrate both logic and schema generation. For example

def describe(cols, fun):
    schema = StructType([StructField(c, DoubleType()) for c in cols])
    @pandas_udf(schema, PandasUDFType, PandasUDFType.GROUPED_MAP)
    def _(df):
        return df[cols].agg([fun])
    return _

df = spark.createDataFrame([(1, 2.0, 1.0, 3.0), (1, 4.0, 2.0, 5.0)], ("id", "x", "y", "z"))

df.groupBy("id").apply(describe(["x"], "mean")).show()                                         
# +---+                                                                           
# |  x|
# +---+
# |3.0|
#+---+


df.groupBy("id").apply(describe(["x", "y"], "mean")).show()                                    
# +---+---+                                                                       
# |  x|  y|
# +---+---+
# |3.0|1.5|
# +---+---+
0
votes

Slightly modifying @Thomas answer I did the following. Since df.types returns a list of tuples (at least in the latest pandas version) and not a dictionary, I replaced str(dfp.dtypes[key]) with dict(df.dtypes)[key]

def udf_schema_from_pandas(df):
  column_types  = [StructField(key, mapping[dict(df.dtypes)[key]]()) for key in df.columns]
  schema = StructType(column_types)
  return schema