I have weekly timeseries data and am trying to use Pyspark SQL to calculate, for several columns, the weekly sum of the trailing 8 weeks. I have tried using Pyspark window functions; specifically:
sum(df[valueCol]).over(partitionBy(df[idCol]).orderBy(df[timeCol]).rangeBetween(-7, 0))
But this code runs very slowly (30-60 seconds per column for 1000 unique IDs and 170 time steps). I understand from other StackOverflow questions that partitions and shuffles can cause performance issues, so to understand those issues better I am manually calculating the 8 most recent weekly values for each week in 8 columns, and will then add those columns to get the trailing 8 weeks' sum.
Here is a simplified dataset I created:
idCount = 2
tCount = 10
df = pd.DataFrame({'customerId': [x for x in range(idCount) for y in range(tCount)],
't': [y for x in range(idCount) for y in range(tCount)],
'vals': range(idCount * tCount)})[['customerId', 't', 'vals']]
which creates this data frame:
Input Dataframe
customerId t vals
0 0 0 0
1 0 1 1
2 0 2 2
3 0 3 3
4 0 4 4
5 0 5 5
6 0 6 6
7 0 7 7
8 0 8 8
9 0 9 9
10 1 0 10
11 1 1 11
12 1 2 12
13 1 3 13
14 1 4 14
15 1 5 15
16 1 6 16
17 1 7 17
18 1 8 18
19 1 9 19
My goal output is the 8 weekly lagged "vals" columns, including vals_0 as the current week's value, with NaNs where data is unavailable:
Goal Output Dataframe
customerId t vals_0 vals_1 vals_2 vals_3 vals_4 vals_5 vals_6 vals_7
0 0 0 0 NaN NaN NaN NaN NaN NaN NaN
1 0 1 1 0.0 NaN NaN NaN NaN NaN NaN
2 0 2 2 1.0 0.0 NaN NaN NaN NaN NaN
3 0 3 3 2.0 1.0 0.0 NaN NaN NaN NaN
4 0 4 4 3.0 2.0 1.0 0.0 NaN NaN NaN
5 0 5 5 4.0 3.0 2.0 1.0 0.0 NaN NaN
6 0 6 6 5.0 4.0 3.0 2.0 1.0 0.0 NaN
7 0 7 7 6.0 5.0 4.0 3.0 2.0 1.0 0.0
8 0 8 8 7.0 6.0 5.0 4.0 3.0 2.0 1.0
9 0 9 9 8.0 7.0 6.0 5.0 4.0 3.0 2.0
10 1 0 10 NaN NaN NaN NaN NaN NaN NaN
11 1 1 11 10.0 NaN NaN NaN NaN NaN NaN
12 1 2 12 11.0 10.0 NaN NaN NaN NaN NaN
13 1 3 13 12.0 11.0 10.0 NaN NaN NaN NaN
14 1 4 14 13.0 12.0 11.0 10.0 NaN NaN NaN
15 1 5 15 14.0 13.0 12.0 11.0 10.0 NaN NaN
16 1 6 16 15.0 14.0 13.0 12.0 11.0 10.0 NaN
17 1 7 17 16.0 15.0 14.0 13.0 12.0 11.0 10.0
18 1 8 18 17.0 16.0 15.0 14.0 13.0 12.0 11.0
19 1 9 19 18.0 17.0 16.0 15.0 14.0 13.0 12.0
The following Pandas function creates the goal output dataframe:
def get_lag_cols_pandas(df, partCol, timeCol, lagCol, numLags):
newdf = df[[partCol, timeCol, lagCol]]
for x in range(numLags):
newCol = '{}_{}'.format(lagCol, x)
joindf = newdf[[partCol, timeCol, lagCol]]
joindf[timeCol] = newdf[timeCol] + x
joindf = joindf.rename(columns = {lagCol: newCol})
newdf = newdf.merge(joindf, how = 'left', on = [partCol, timeCol])
return newdf.drop(lagCol, axis = 1)
and runs in roughly 500ms:
>>> %timeit print('pandas result: \n{}\n\n'.format(get_lag_cols_pandas(df, 'customerId', 't', 'vals', 8)))
1 loop, best of 3: 501 ms per loop
I can also accomplish this in Dask using map_partitions()
and get the same results in ~900 ms (presumably worse than Pandas due to the overhead from spinning up a thread):
>>> ddf = dd.from_pandas(df, npartitions = 1)
>>> %timeit print('dask result: \n{}\n\n'.format(ddf.map_partitions(lambda df: get_lag_cols_pandas(df, \
'customerId', 't', 'vals', 8)).compute(scheduler = 'threads')))
1 loop, best of 3: 893 ms per loop
I can also accomplish this in Pyspark (Note: For both Dask and Spark I have only one partition, to make a more fair comparison with Pandas):
>>> sparkType = SparkSession.builder.master('local[1]')
>>> spark = sparkType.getOrCreate()
>>> sdf = spark.createDataFrame(df)
>>> sdf.show()
+----------+---+----+
|customerId| t|vals|
+----------+---+----+
| 0| 0| 0|
| 0| 1| 1|
| 0| 2| 2|
| 0| 3| 3|
| 0| 4| 4|
| 0| 5| 5|
| 0| 6| 6|
| 0| 7| 7|
| 0| 8| 8|
| 0| 9| 9|
| 1| 0| 10|
| 1| 1| 11|
| 1| 2| 12|
| 1| 3| 13|
| 1| 4| 14|
| 1| 5| 15|
| 1| 6| 16|
| 1| 7| 17|
| 1| 8| 18|
| 1| 9| 19|
+----------+---+----+
>>> sdf.rdd.getNumPartitions()
1
with the following code:
def get_lag_cols_spark(df, partCol, timeCol, lagCol, numLags):
newdf = df.select(df[partCol], df[timeCol], df[lagCol])
for x in range(numLags):
newCol = '{}_{}'.format(lagCol, x)
joindf = newdf.withColumn('newIdx', newdf[timeCol] + x) \
.drop(timeCol).withColumnRenamed('newIdx', timeCol) \
.withColumnRenamed(lagCol, newCol)
newdf = newdf.join(joindf.select(joindf[partCol], joindf[timeCol], joindf[newCol]), [partCol, timeCol], how = 'left')
newdf = newdf.drop(lagCol)
return newdf
I get the correct results back (although shuffled):
+----------+---+------+------+------+------+------+------+------+------+
|customerId| t|vals_0|vals_1|vals_2|vals_3|vals_4|vals_5|vals_6|vals_7|
+----------+---+------+------+------+------+------+------+------+------+
| 1| 3| 13| 12| 11| 10| null| null| null| null|
| 1| 0| 10| null| null| null| null| null| null| null|
| 1| 1| 11| 10| null| null| null| null| null| null|
| 0| 9| 9| 8| 7| 6| 5| 4| 3| 2|
| 0| 1| 1| 0| null| null| null| null| null| null|
| 1| 4| 14| 13| 12| 11| 10| null| null| null|
| 0| 4| 4| 3| 2| 1| 0| null| null| null|
| 0| 3| 3| 2| 1| 0| null| null| null| null|
| 0| 7| 7| 6| 5| 4| 3| 2| 1| 0|
| 1| 5| 15| 14| 13| 12| 11| 10| null| null|
| 1| 6| 16| 15| 14| 13| 12| 11| 10| null|
| 0| 6| 6| 5| 4| 3| 2| 1| 0| null|
| 1| 7| 17| 16| 15| 14| 13| 12| 11| 10|
| 0| 8| 8| 7| 6| 5| 4| 3| 2| 1|
| 0| 0| 0| null| null| null| null| null| null| null|
| 0| 2| 2| 1| 0| null| null| null| null| null|
| 1| 2| 12| 11| 10| null| null| null| null| null|
| 1| 9| 19| 18| 17| 16| 15| 14| 13| 12|
| 0| 5| 5| 4| 3| 2| 1| 0| null| null|
| 1| 8| 18| 17| 16| 15| 14| 13| 12| 11|
+----------+---+------+------+------+------+------+------+------+------+
but the Pyspark version takes significantly longer to run (34 seconds):
>>> %timeit get_lag_cols_spark(sdf, 'customerId', 't', 'vals', 8).show()
1 loop, best of 3: 34 s per loop
I kept this example small and simple (only 20 data rows, only 1 partition for both Dask and Spark), so I would not expect memory and CPU usage to drive significant performance differences.
My question is: Is there any way to better configure Pyspark or optimize Pyspark execution on this specific task to bring Pyspark closer to Pandas and Dask speed-wise (i.e., 0.5-1.0 seconds)?