5
votes

I have final records (after joins and filter) in spark dataframe.I need to compare consecutive rows's (partition by key) column values and based on condition need to change e_date column value for example:

    sample table
    key1 key 2   col1   col2   s_date      e_date
     a     1      cv1     cv2   2014         2099 
     a     1      cv3     cv2   2016         2099 
     b     2      cv5     cv6   2016         2099
     b     2      cv5     cv6   2016         2099

   final table should look like 
    key1 key 2   col1   col2   s_date      e_date
     a     1      cv1     cv2   2014         2015  (next records s_date-1) 
     a     1      cv3     cv2   2016         2099 
     b     2      cv5     cv6   2016         2099
  1. above table has composite key so key1 and key2 are keys

  2. compare col1 and col2 values over partition by keys

  3. if any column has new value end old record with new record's s_date -1 (line 1 ,2 in final table)

  4. if no change then ignore new record (line 3 in final table)

any pointer in scala-spark

1

1 Answers

9
votes

Lead and lag are already implemented:

import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window

lag('s_date, 1).over(Window.partitionBy('key1, 'key2).orderBy('s_date))

Check Introducing Window Functions in Spark SQL for details.