2
votes

I have weekly timeseries data and am trying to use Pyspark SQL to calculate, for several columns, the weekly sum of the trailing 8 weeks. I have tried using Pyspark window functions; specifically:

sum(df[valueCol]).over(partitionBy(df[idCol]).orderBy(df[timeCol]).rangeBetween(-7, 0))

But this code runs very slowly (30-60 seconds per column for 1000 unique IDs and 170 time steps). I understand from other StackOverflow questions that partitions and shuffles can cause performance issues, so to understand those issues better I am manually calculating the 8 most recent weekly values for each week in 8 columns, and will then add those columns to get the trailing 8 weeks' sum.

Here is a simplified dataset I created:

idCount = 2
tCount = 10

df = pd.DataFrame({'customerId': [x for x in range(idCount) for y in range(tCount)],
               't': [y for x in range(idCount) for y in range(tCount)],
               'vals': range(idCount * tCount)})[['customerId', 't', 'vals']]

which creates this data frame:

Input Dataframe

   customerId   t   vals
0           0   0   0
1           0   1   1
2           0   2   2
3           0   3   3
4           0   4   4
5           0   5   5
6           0   6   6
7           0   7   7
8           0   8   8
9           0   9   9
10          1   0   10
11          1   1   11
12          1   2   12
13          1   3   13
14          1   4   14
15          1   5   15
16          1   6   16
17          1   7   17
18          1   8   18
19          1   9   19

My goal output is the 8 weekly lagged "vals" columns, including vals_0 as the current week's value, with NaNs where data is unavailable:

Goal Output Dataframe

    customerId  t  vals_0  vals_1  vals_2  vals_3  vals_4  vals_5  vals_6  vals_7
0            0  0       0     NaN     NaN     NaN     NaN     NaN     NaN     NaN
1            0  1       1     0.0     NaN     NaN     NaN     NaN     NaN     NaN
2            0  2       2     1.0     0.0     NaN     NaN     NaN     NaN     NaN
3            0  3       3     2.0     1.0     0.0     NaN     NaN     NaN     NaN
4            0  4       4     3.0     2.0     1.0     0.0     NaN     NaN     NaN
5            0  5       5     4.0     3.0     2.0     1.0     0.0     NaN     NaN
6            0  6       6     5.0     4.0     3.0     2.0     1.0     0.0     NaN
7            0  7       7     6.0     5.0     4.0     3.0     2.0     1.0     0.0
8            0  8       8     7.0     6.0     5.0     4.0     3.0     2.0     1.0
9            0  9       9     8.0     7.0     6.0     5.0     4.0     3.0     2.0
10           1  0      10     NaN     NaN     NaN     NaN     NaN     NaN     NaN
11           1  1      11    10.0     NaN     NaN     NaN     NaN     NaN     NaN
12           1  2      12    11.0    10.0     NaN     NaN     NaN     NaN     NaN
13           1  3      13    12.0    11.0    10.0     NaN     NaN     NaN     NaN
14           1  4      14    13.0    12.0    11.0    10.0     NaN     NaN     NaN
15           1  5      15    14.0    13.0    12.0    11.0    10.0     NaN     NaN
16           1  6      16    15.0    14.0    13.0    12.0    11.0    10.0     NaN
17           1  7      17    16.0    15.0    14.0    13.0    12.0    11.0    10.0
18           1  8      18    17.0    16.0    15.0    14.0    13.0    12.0    11.0
19           1  9      19    18.0    17.0    16.0    15.0    14.0    13.0    12.0

The following Pandas function creates the goal output dataframe:

def get_lag_cols_pandas(df, partCol, timeCol, lagCol, numLags):
    newdf = df[[partCol, timeCol, lagCol]]
    for x in range(numLags):
        newCol = '{}_{}'.format(lagCol, x)
        joindf = newdf[[partCol, timeCol, lagCol]]
        joindf[timeCol] = newdf[timeCol] + x
        joindf = joindf.rename(columns = {lagCol: newCol})
        newdf = newdf.merge(joindf, how = 'left', on = [partCol, timeCol])
    return newdf.drop(lagCol, axis = 1)

and runs in roughly 500ms:

>>> %timeit print('pandas result: \n{}\n\n'.format(get_lag_cols_pandas(df, 'customerId', 't', 'vals', 8)))
1 loop, best of 3: 501 ms per loop

I can also accomplish this in Dask using map_partitions() and get the same results in ~900 ms (presumably worse than Pandas due to the overhead from spinning up a thread):

>>> ddf = dd.from_pandas(df, npartitions = 1)
>>> %timeit print('dask result: \n{}\n\n'.format(ddf.map_partitions(lambda df: get_lag_cols_pandas(df, \
                                                    'customerId', 't', 'vals', 8)).compute(scheduler = 'threads')))
1 loop, best of 3: 893 ms per loop

I can also accomplish this in Pyspark (Note: For both Dask and Spark I have only one partition, to make a more fair comparison with Pandas):

>>> sparkType = SparkSession.builder.master('local[1]')
>>> spark = sparkType.getOrCreate()
>>> sdf = spark.createDataFrame(df)
>>> sdf.show()
+----------+---+----+
|customerId|  t|vals|
+----------+---+----+
|         0|  0|   0|
|         0|  1|   1|
|         0|  2|   2|
|         0|  3|   3|
|         0|  4|   4|
|         0|  5|   5|
|         0|  6|   6|
|         0|  7|   7|
|         0|  8|   8|
|         0|  9|   9|
|         1|  0|  10|
|         1|  1|  11|
|         1|  2|  12|
|         1|  3|  13|
|         1|  4|  14|
|         1|  5|  15|
|         1|  6|  16|
|         1|  7|  17|
|         1|  8|  18|
|         1|  9|  19|
+----------+---+----+
>>> sdf.rdd.getNumPartitions()
1

with the following code:

def get_lag_cols_spark(df, partCol, timeCol, lagCol, numLags):
    newdf = df.select(df[partCol], df[timeCol], df[lagCol])
    for x in range(numLags):
        newCol = '{}_{}'.format(lagCol, x)
        joindf = newdf.withColumn('newIdx', newdf[timeCol] + x) \
                                     .drop(timeCol).withColumnRenamed('newIdx', timeCol) \
                                     .withColumnRenamed(lagCol, newCol)
        newdf = newdf.join(joindf.select(joindf[partCol], joindf[timeCol], joindf[newCol]), [partCol, timeCol], how = 'left')
    newdf = newdf.drop(lagCol)
    return newdf

I get the correct results back (although shuffled):

+----------+---+------+------+------+------+------+------+------+------+
|customerId|  t|vals_0|vals_1|vals_2|vals_3|vals_4|vals_5|vals_6|vals_7|
+----------+---+------+------+------+------+------+------+------+------+
|         1|  3|    13|    12|    11|    10|  null|  null|  null|  null|
|         1|  0|    10|  null|  null|  null|  null|  null|  null|  null|
|         1|  1|    11|    10|  null|  null|  null|  null|  null|  null|
|         0|  9|     9|     8|     7|     6|     5|     4|     3|     2|
|         0|  1|     1|     0|  null|  null|  null|  null|  null|  null|
|         1|  4|    14|    13|    12|    11|    10|  null|  null|  null|
|         0|  4|     4|     3|     2|     1|     0|  null|  null|  null|
|         0|  3|     3|     2|     1|     0|  null|  null|  null|  null|
|         0|  7|     7|     6|     5|     4|     3|     2|     1|     0|
|         1|  5|    15|    14|    13|    12|    11|    10|  null|  null|
|         1|  6|    16|    15|    14|    13|    12|    11|    10|  null|
|         0|  6|     6|     5|     4|     3|     2|     1|     0|  null|
|         1|  7|    17|    16|    15|    14|    13|    12|    11|    10|
|         0|  8|     8|     7|     6|     5|     4|     3|     2|     1|
|         0|  0|     0|  null|  null|  null|  null|  null|  null|  null|
|         0|  2|     2|     1|     0|  null|  null|  null|  null|  null|
|         1|  2|    12|    11|    10|  null|  null|  null|  null|  null|
|         1|  9|    19|    18|    17|    16|    15|    14|    13|    12|
|         0|  5|     5|     4|     3|     2|     1|     0|  null|  null|
|         1|  8|    18|    17|    16|    15|    14|    13|    12|    11|
+----------+---+------+------+------+------+------+------+------+------+

but the Pyspark version takes significantly longer to run (34 seconds):

>>> %timeit get_lag_cols_spark(sdf, 'customerId', 't', 'vals', 8).show()
1 loop, best of 3: 34 s per loop

I kept this example small and simple (only 20 data rows, only 1 partition for both Dask and Spark), so I would not expect memory and CPU usage to drive significant performance differences.

My question is: Is there any way to better configure Pyspark or optimize Pyspark execution on this specific task to bring Pyspark closer to Pandas and Dask speed-wise (i.e., 0.5-1.0 seconds)?

1

1 Answers

0
votes

pyspark is slow by definition as Spark itself is written in Scala and any pyspark program involves running at least 1 JVM (usually 1 driver and multiple workers) and python programs (1 per worker) and communications between them. The amount of interprocess communication between the java and python side depends on the python code you use.

Even without all the inter-language hoopla, spark has a lot of overhead that is geared toward handling big data distributed processing - this means that Spark programs tend to be slower than any non-distributed solutions... as long as the scale is small. Spark and pyspark are purposely built for large-scale and that's where it shines