I am trying to write a Python utility function that accepts an object of a locally-defined class, and uses one of that class's methods as a user-defined function (UDF), in a PySpark DataFrame withColumn
call. The utility function signature is:
def spark_analyze(lp: LogProcessor):
In the LogProcessor
class, I have a method that I'd like to use as a UDF. The method definition is:
schema = StructType([
StructField("total", IntegerType(), False),
StructField("other", IntegerType(), False)
])
def ProcessLog(self, log_file):
self.PrepareForLog()
for event in pyspark_utils.spark_events_from_file(log_file):
self.ProcessEvent(event)
return [total, other]
In the spark_analyze
, I do the following, where lp
is the passed-in object of type LogProcessor
:
@udf(lp.schema)
def lpf(lcm_file):
lp.ProcessLog(lcm_file)
return (df.withColumn('results', lpf(col('logfile_dir')))
...
The produces a long Python stack trace, which starts like this:
/home/david/libs.zip/pyspark_utils.py in spark_analyze(lp) 132 def lpf(lcm_file): 133 lp.ProcessLog(lcm_file) --> 134 return (df.withColumn('results', lpf(col('logfile_dir'))) 135 .withColumn('log name', spark_get_dataset_name(col('logfile_dir'))) 136 .select('log name', 'results.*')
/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/functions.py in wrapper(*args) 1955 @functools.wraps(f) 1956 def wrapper(*args): -> 1957 return udf_obj(*args) 1958 1959 wrapper.func = udf_obj.func
and ends with:
/home/david/libs.zip/pyspark_utils.py in spark_analyze(lp) 132 def lpf(lcm_file): 133 lp.ProcessLog(lcm_file) --> 134 return (df.withColumn('results', lpf(col('logfile_dir'))) 135 .withColumn('log name', spark_get_dataset_name(col('logfile_dir'))) 136 .select('log name', 'results.*')
/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/functions.py in wrapper(*args) 1955 @functools.wraps(f) 1956 def wrapper(*args): -> 1957 return udf_obj(*args) 1958 1959 wrapper.func = udf_obj.func
I did some testing, and found that things work fine if I define my UDF right above the place where I pass it to col
. I also tried redefining the ProcessLog
to just return [0,0]
, and found that the problem did not go away. So the problem seems to be that I'm using a passed-in class object's method as a UDF. Is there another way to have a UDF be a method in a class? Thanks for any help here!