2
votes

I'm having the world of issues performing a rolling join of two dataframes in pyspark (and python in general). I am looking to join two pyspark dataframes together by their ID & closest date backwards (meaning the date in the second dataframe cannot be greater than the one in the first)

Table_1:

+-----+------------+-------+
| ID  |    Date    | Value |
+-----+------------+-------+
| A1  | 01-15-2020 |     5 |
| A2  | 01-20-2020 |    10 |
| A3  | 02-21-2020 |    12 |
| A1  | 02-21-2020 |     6 |
+-----+------------+-------+

Table_2:

enter image description here

Desired Result:

ID   Date    Value   Value 2
A1  01-15-2020  5   5
A2  01-20-2020  10  12
A3  02-21-2020  12  14
A1  01-21-2020  6   3

In essence, I understand an SQL Query can do the trick where I can do spark.sql("query") So anything else. I've tried several things which aren't working in a spark context. Thanks!

2

2 Answers

1
votes

Here is my trial.

First, I determine the Date_2 which met your condition. After that, join the second dataframe again and get the Value_2

from pyspark.sql.functions import monotonically_increasing_id, unix_timestamp, max

df3 = df1.withColumn('newId', monotonically_increasing_id()) \
  .join(df2, 'ID', 'left') \
  .where(unix_timestamp('Date', 'M/dd/yy') >= unix_timestamp('Date_2', 'M/dd/yy')) \
  .groupBy(*df1.columns, 'newId') \
  .agg(max('Date_2').alias('Date_2'))
df3.orderBy('newId').show(20, False)    

+---+-------+-----+-----+-------+
|ID |Date   |Value|newId|Date_2 |
+---+-------+-----+-----+-------+
|A1 |1/15/20|5    |0    |1/12/20|
|A2 |1/20/20|10   |1    |1/11/20|
|A3 |2/21/20|12   |2    |1/31/20|
|A1 |1/21/20|6    |3    |1/16/20|
+---+-------+-----+-----+-------+

df3.join(df2, ['ID', 'Date_2'], 'left') \
  .orderBy('newId') \
  .drop('Date_2', 'newId') \
  .show(20, False)

+---+-------+-----+-------+
|ID |Date   |Value|Value_2|
+---+-------+-----+-------+
|A1 |1/15/20|5    |5      |
|A2 |1/20/20|10   |12     |
|A3 |2/21/20|12   |14     |
|A1 |1/21/20|6    |3      |
+---+-------+-----+-------+
1
votes
df1=spark.createDataFrame([('A1','1/15/2020',5),
                           ('A2','1/20/2020',10), 
                           ('A3','2/21/2020',12),
                           ('A1','1/21/2020',6)],
                           ['ID1','Date1','Value1'])

df2=spark.createDataFrame([('A1','1/10/2020',1),
                           ('A1','1/12/2020',5),
                           ('A1','1/16/2020',3),
                           ('A2','1/25/2020',20),
                           ('A2','1/1/2020',12),
                           ('A3','1/31/2020',14),
                           ('A3','1/30/2020',12)],['ID2','Date2','Value2'])

df2=df1.join(df2,df1.ID1==df2.ID2) \
    .withColumn("distance",datediff(to_date(df1.Date1,'MM/dd/yyyy'),\
     to_date(df2.Date2,'MM/dd/yyyy'))).filter("distance>0")

df2.groupBy(df2.ID1,df2.Date1,df2.Value1)\
   .agg(min(df2.distance).alias('distance')).join(df2, ['ID1','Date1','distance'])\
   .select(df2.ID1,df2.Date1,df2.Value1,df2.Value2).orderBy('ID1','Date1').show()