0
votes

I have another solution, but I prefer to use PySpark 2.3 to do it.

I have a two dimensional PySpark data frame like this:

Date       | ID
---------- | ----
08/31/2018 | 10
09/31/2018 | 10
09/01/2018 | null
09/01/2018 | null
09/01/2018 | 12

I wanted to replace ID null values by looking for the closest in the past, or if that value is null, by looking forward (and if it is again null, set a default value)

I have imagined adding a new column with .withColumn and use a UDF function which will query the data frame itself.

Something like that in pseudo code (not perfect but it is the main idea):

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def return_value(value,date):

    if value is not null:
        return val

    value1 = df.filter(df['date']<= date).select(df['value']).collect()

    if (value1)[0][0] is not null:
        return (value1)[0][0]

    value2 = df.filter(tdf['date']>= date).select(df['value']).collect()
        return (value2)[0][0]


value_udf = udf(return_value,StringType())
new_df = tr.withColumn("new_value", value_udf(df.value,df.date))

But it does not work. Am I completely on the wrong way to do it? Is it only possible to query a Spark data frame in a UDF function? Did I miss an easier solution?

1
In your example you have 3 rows with the same date, 2 of which with nulls. What is the expected result You are trying to get in this exact case? Do you want to get 10 from 09/31/2018 row for both nulls OR do You want to get it only for the first null and 12 (from last row) for the second null record? looking at Your pandas code I assume the former.Tetlanesh

1 Answers

-1
votes

Create new dataframe that have one column - unique list of all dates:

datesDF = yourDF.select('Date').distinct()

Create another one that will consist of dates and ID's but only ones where there is no nulls. And also lets keep only first (whatever will be first) occurrence of ID for each date (judging from your example you can have multiple rows per date)

noNullsDF = yourDF.dropna().dropDuplicates(subset='Date')

Lets now join those two so that we have list of all dates with whatever value we have for it (or null)

joinedDF = datesDF.join(noNullsDF, 'Date', 'left')

Now for every date get the value of ID from previous date and next date using window functions and also lets rename our ID column so later there will be less problems with join:

from pyspark.sql.window import Window
from pyspark.sql import functions as f
w = Window.orderBy('Date')

joinedDF = joinedDF.withColumn('previousID',f.lag('ID').over(w)) 
                   .withColumn('nextID',f.lead('ID').over(w))
                   .withColumnRenamed('ID','newID') 

Now lets join it back to our original Dataframe by date

yourDF = yourDF.join(joinedDF, 'Date', 'left')

Now our Dataframe have 4 ID columns:

  1. original ID
  2. newID - ID of any non-null value of given date if any or null
  3. previousID - ID from previous date (non null if any or null)
  4. nextID - ID from next date (non null if any or null)

Now we need to combine them into finalID in order:

  1. original value if not null
  2. value for current date if any non nulls present (it's in contrast with your question but you pandas code suggest you go <= on date checking) if result is not null
  3. value for previous date if its not null
  4. value for next date if its not null
  5. some default value

We do it's simply by coalescing:

default = 0
finalDF = yourDF.select('Date', 
                        'ID',
                        f.coalesce('ID',
                                   'newID',
                                   'previousID',
                                   'nextID',
                                   f.lit(default)).alias('finalID')
                       )