6
votes

How can you do the same thing as df.fillna(method='bfill') for a pandas dataframe with a pyspark.sql.DataFrame?

The pyspark dataframe has the pyspark.sql.DataFrame.fillna method, however there is no support for a method parameter.


In pandas you can use the following to backfill a time series:

Create data

import pandas as pd

index = pd.date_range('2017-01-01', '2017-01-05')
data = [1, 2, 3, None, 5]

df = pd.DataFrame({'data': data}, index=index)

Giving

Out[1]:
            data
2017-01-01  1.0
2017-01-02  2.0
2017-01-03  3.0
2017-01-04  NaN
2017-01-05  5.0

Backfill the dataframe

df = df.fillna(method='bfill')

Produces the backfilled frame

Out[2]:
            data
2017-01-01  1.0
2017-01-02  2.0
2017-01-03  3.0
2017-01-04  5.0
2017-01-05  5.0

How can the same thing be done for a pyspark.sql.DataFrame?

2
there is good info about forward fill in stackoverflow.com/questions/36019847/… Have a look at it.Pushkr
I had the same problem, I converted the dataset to pandas DF, implemented the appropriate method (bfill...) then converted back to Spark DF. Before converting back to Spark though, I added a section to coerce each columns of my pandas DF in the appropriate data type. Spark can be picky on data type especially if you use a method such as 'interpolate', where you can end up with integer and float in the same column. Hope this will help.Yoan B. M.Sc

2 Answers

4
votes

Actually backfill on distributed dataset is not as easy task as in pandas (local) dataframe - you cannot be sure that value to fill exists in the same partition. I would use crossJoin with windowing, for example fo DF:

df = spark.createDataFrame([
    ('2017-01-01', None), 
    ('2017-01-02', 'B'), 
    ('2017-01-03', None), 
    ('2017-01-04', None), 
    ('2017-01-05', 'E'), 
    ('2017-01-06', None), 
    ('2017-01-07', 'G')], ['date', 'value'])
df.show()

+----------+-----+
|      date|value|
+----------+-----+
|2017-01-01| null|
|2017-01-02|    B|
|2017-01-03| null|
|2017-01-04| null|
|2017-01-05|    E|
|2017-01-06| null|
|2017-01-07|    G|
+----------+-----+

The code would be:

from pyspark.sql.window import Window

df.alias('a').crossJoin(df.alias('b')) \
    .where((col('b.date') >= col('a.date')) & (col('a.value').isNotNull() | col('b.value').isNotNull())) \
    .withColumn('rn', row_number().over(Window.partitionBy('a.date').orderBy('b.date'))) \
    .where(col('rn') == 1) \
    .select('a.date', coalesce('a.value', 'b.value').alias('value')) \
    .orderBy('a.date') \
    .show()

+----------+-----+
|      date|value|
+----------+-----+
|2017-01-01|    B|
|2017-01-02|    B|
|2017-01-03|    E|
|2017-01-04|    E|
|2017-01-05|    E|
|2017-01-06|    G|
|2017-01-07|    G|
+----------+-----+
4
votes

The last and first functions, with their ignorenulls=True flags, can be combined with the rowsBetween windowing. If we want to fill backwards, we select the first non-null that is between the current row and the end. If we want to fill forwards, we select the last non-null that is between the beginning and the current row.

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
import sys

df.withColumn(
  'data',
  F.first(
    F.col('data'),
    ignorenulls=True
  ) \
    .over(
      W.orderBy('date').rowsBetween(0, sys.maxsize)
    )
  )

source on filling in spark: https://towardsdatascience.com/end-to-end-time-series-interpolation-in-pyspark-filling-the-gap-5ccefc6b7fc9