1
votes

I am trying to write a Python utility function that accepts an object of a locally-defined class, and uses one of that class's methods as a user-defined function (UDF), in a PySpark DataFrame withColumn call. The utility function signature is:

 def spark_analyze(lp: LogProcessor):

In the LogProcessor class, I have a method that I'd like to use as a UDF. The method definition is:

schema = StructType([
  StructField("total", IntegerType(), False),
  StructField("other", IntegerType(), False)
])

def ProcessLog(self, log_file):
    self.PrepareForLog()
    for event in pyspark_utils.spark_events_from_file(log_file):
      self.ProcessEvent(event)
      return [total, other]

In the spark_analyze, I do the following, where lp is the passed-in object of type LogProcessor:

@udf(lp.schema)
def lpf(lcm_file):
    lp.ProcessLog(lcm_file)
return (df.withColumn('results', lpf(col('logfile_dir')))
...

The produces a long Python stack trace, which starts like this:

/home/david/libs.zip/pyspark_utils.py in spark_analyze(lp) 132 def lpf(lcm_file): 133 lp.ProcessLog(lcm_file) --> 134 return (df.withColumn('results', lpf(col('logfile_dir'))) 135 .withColumn('log name', spark_get_dataset_name(col('logfile_dir'))) 136 .select('log name', 'results.*')

/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/functions.py in wrapper(*args) 1955 @functools.wraps(f) 1956 def wrapper(*args): -> 1957 return udf_obj(*args) 1958 1959 wrapper.func = udf_obj.func

and ends with:

/home/david/libs.zip/pyspark_utils.py in spark_analyze(lp) 132 def lpf(lcm_file): 133 lp.ProcessLog(lcm_file) --> 134 return (df.withColumn('results', lpf(col('logfile_dir'))) 135 .withColumn('log name', spark_get_dataset_name(col('logfile_dir'))) 136 .select('log name', 'results.*')

/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/sql/functions.py in wrapper(*args) 1955 @functools.wraps(f) 1956 def wrapper(*args): -> 1957 return udf_obj(*args) 1958 1959 wrapper.func = udf_obj.func

I did some testing, and found that things work fine if I define my UDF right above the place where I pass it to col. I also tried redefining the ProcessLog to just return [0,0], and found that the problem did not go away. So the problem seems to be that I'm using a passed-in class object's method as a UDF. Is there another way to have a UDF be a method in a class? Thanks for any help here!

1
Have you registered this UDF defined in class? , something like this ` sparkContext.udf.register("ProcessLog",LogProcessor.ProcessLog,"Return Type") ` and then later you should be able to invoke it as ` df.withColumn("Result","ProcessLog(col('logfile_dir')") . And also change the method defintion to static. Try this if it works for you. I was able to invoke udf like above.Usman Azhar
Ah, that is a good idea! I ended up switching to a procedural style and just passing the function definition, which worked. I suspect your version would work too! Thank you for the reply.David Lobron

1 Answers

0
votes

The approach that Usman Azhar suggested may work. I ended up solving this by simply passing the definition of my UDF as an argument of my library function.