1
votes

There's a DataFrame in PySpark with data as below:

Original data:

Shop Customer date        retrive_days
A    C1       15/06/2019  2
A    C1       16/06/2019  0
A    C1       17/06/2019  0
A    C1       18/06/2019  0
B    C2       20/07/2019  5
B    C2       21/07/2019  0
B    C2       23/07/2019  0
B    C2       30/07/2019  0
B    C2       01/08/2019  6
B    C2       02/08/2019  0
B    C2       03/08/2019  0
B    C2       09/08/2019  0
B    C2       10/08/2019  1
B    C2       11/08/2019  0
B    C2       13/08/2019  0

Each customer has a date he/she visited the shop and each customer also has retrive_days and that many days data has to be fetched to the output.

I am trying to get an output which should look like this in PySpark, filtered based on the retrive_days value for each customer

Expected Output:

Shop Customer date        retrive_days
A    C1       15/06/2019  2
A    C1       16/06/2019  0
B    C2       20/07/2019  5
B    C2       21/07/2019  0
B    C2       23/07/2019  0
B    C2       01/08/2019  6
B    C2       02/08/2019  0
B    C2       03/08/2019  0
B    C2       10/08/2019  1
B    C2       11/08/2019  0
1
can you post what you tried and what is the problem that you are facing? - Raghu
what do you mean by retrieve days ? the question is not clear - dsk

1 Answers

0
votes

Try this with window functions.

In your example output, the last row should be omitted because for your other 2,5,6 logic the date should not equal the max date( retrive_days + date ). If thats not the case do filter('date1<=max_date') .

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w1=Window().partitionBy("Shop","Customer").orderBy("date1")
w2=Window().partitionBy("Shop","Customer","partitions")

df.withColumn("date1", F.to_date("date", "dd/MM/yyyy"))\
  .withColumn("partitions", F.sum(F.expr("""IF(retrive_days!=0, 1, 0)""")).over(w1))\
  .withColumn("max_date", F.max(F.expr("""IF(retrive_days!=0,date_add(date1,retrive_days),null)""")).over(w2))\
  .filter('date1<max_date').drop("date1","max_date","partitions").show()

#+----+--------+----------+------------+
#|Shop|Customer|      date|retrive_days|
#+----+--------+----------+------------+
#|   A|      C1|15/06/2019|           2|
#|   A|      C1|16/06/2019|           0|
#|   B|      C2|20/07/2019|           5|
#|   B|      C2|21/07/2019|           0|
#|   B|      C2|23/07/2019|           0|
#|   B|      C2|01/08/2019|           6|
#|   B|      C2|02/08/2019|           0|
#|   B|      C2|03/08/2019|           0|
#|   B|      C2|10/08/2019|           1|
#+----+--------+----------+------------+