0
votes

I have the below pyspark df which can be recreated by the code

df = spark.createDataFrame([(1, "John Doe", "2020-11-30"),(2, "John Doe", "2020-11-27"),(3, "John Doe", "2020-11-29")],
                            ("id", "name", "date")) 

   +---+--------+----------+
| id|    name|      date|
+---+--------+----------+
|  1|John Doe|2020-11-30|
|  2|John Doe|2020-11-27|
|  3|John Doe|2020-11-29|
+---+--------+----------+

I am looking to create a udf to calculate difference between 2 rows of dates (using Lag function) excluding weekends as pyspark 2.2.0 does not has an in-built function to do so. eg. the difference between 2020-11-30 & 2020-11-27 should give 1 as they are Monday & Friday respectively.

I have tried to create the below with help from calculate difference between two dates excluding weekends in python :

from pyspark.sql.functions import udf
import numpy as np
workdaUDF = udf(lambda z: workdays(z),IntegerType())
def workdays():
date1 = df.select(F.col('date')).collect()[1][0]
date2 = df.select(F.col('date')).collect()[0][0]
date_diff = np.busday_count(date1,date2)
return date_diff

df.withColumn("date_dif",workdaysUDF(F.col("date"))).show(truncate=False)

But I get the below error

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Any help on how I can make this work on each row of my dataframe would be really helpful.

PS : My date1 and date2 variables need to be dynamic depending on the value of the date on which the function is being applied to. Also, due to the dataframe size, I cannot use pandas for which I found multiple solutions.

Thank you in advance.

1

1 Answers

2
votes

You can't call collect in the UDF. You can only pass in columns to the UDF, so you should pass in the date column and the lag date column, as shown below:

import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

df = spark.createDataFrame([
    (1, "John Doe", "2020-11-30"),
    (2, "John Doe", "2020-11-27"),
    (3, "John Doe", "2020-11-29")],
    ("id", "name", "date")
) 

workdaysUDF = F.udf(lambda date1, date2: int(np.busday_count(date2, date1)) if (date1 is not None and date2 is not None) else None, IntegerType())
df = df.withColumn("date_dif", workdaysUDF(F.col('date'), F.lag(F.col('date')).over(Window.partitionBy('name').orderBy('id'))))
df.show()

+---+--------+----------+--------+
| id|    name|      date|date_dif|
+---+--------+----------+--------+
|  1|John Doe|2020-11-30|    null|
|  2|John Doe|2020-11-27|      -1|
|  3|John Doe|2020-11-29|       1|
+---+--------+----------+--------+