2
votes

I need to perform the below operation on dataframes using Windowing function Lag and Lead.

For each Key, I need to perform the below Insert and update in the final output

Insert Condition:
1. By Default, LAYER_NO=0 , needs to be written in output.
2. If there is any change in the value of COL1,COL2,COL3, with respective to its precious record,then that records needs to be written in output.

Example: key_1 with layer_no=2, there is a change of value from 400 to 600 in COL3

Update Condition:
1. If there were NO changes in the value of COL1,COL2,COL3, with respective to its previous record,but there is a change in "DEPART column", this value needs to be updated in the output.

Example: key_1 with layer_no=3, there were NO changes in COL1,COL2,COL3, But there is value change in DEPART column as "xyz" , so this needs to be updated in the output.
2. Even the LAYER_NO should be updated sequentially, after inserting the record with layer_no=0

    val inputDF = values.toDF("KEY","LAYER_NO","COl1","COl2","COl3","DEPART")

    inputDF.show()   
    +-----+--------+----+----+----+------+
    |  KEY|LAYER_NO|COL1|COL2|COL3|DEPART|
    +-----+--------+----+----+----+------+
    |key_1|       0| 200| 300| 400|   abc|->default write
    |key_1|       1| 200| 300| 400|   abc|
    |key_1|       2| 200| 300| 600|   uil|--->change in col3,so write
    |key_1|       2| 200| 300| 600|   uil|
    |key_1|       3| 200| 300| 600|   xyz|--->change in col4,so update
    |key_2|       0| 500| 700| 900|   prq|->default write
    |key_2|       1| 888| 555| 900|   tep|--->change in col1 & col 2,so write
    |key_3|       0| 111| 222| 333|   lgh|->default write
    |key_3|       1| 084| 222| 333|   lgh|--->change in col1,so write
    |key_3|       2| 084| 222| 333|   rrr|--->change in col4,so update
    +-----+--------+----+----+----+------+

Expected Output:

outputDF.show()
+-----+--------+----+----+----+------+
|  KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1|       0| 200| 300| 400|   abc|
|key_1|       1| 200| 300| 600|   xyz|
|key_2|       0| 500| 700| 900|   prq|
|key_2|       1| 888| 555| 900|   tep|
|key_3|       0| 111| 222| 333|   lgh|
|key_3|       1| 084| 222| 333|   rrr|
+-----+--------+----+----+----+------+
1
why |key_1| 2| 200| 300| 600| uil didn;t come in the output?Ramesh Maharjan
|key_1| 1| 200| 300| 600| uil| is written in the output ,but in the next record there is an value change in DEPART column , so this will update "uil" to "xyz" . So the final recored will be |key_1| 1| 200| 300| 600| xyz|RaAm
so how come the layer_no is 1 ? isnt it supposed to be 3?Ramesh Maharjan
sorry its a miss out of info....now updated the question..Even the LAYER_NO should be updated sequentially, after inserting the record with layer_no=0RaAm

1 Answers

2
votes

We need to define two Window's to arrive at your expected output. One for checking the change in the DEPART column, the second for checking the difference in the sum of COL1 to COL3.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
                  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")

Then we simply replace the values in DEPART column by the correct values, and filter the data to rows where the lagged sum differs from the current sum of columns (and rows where LAYER_NO === 0). Lastly, we replace LAYER_NO by rank.

inputDF.withColumn("DEPART", last("DEPART").over(w_col))
   .withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
   .withColumn("lag_sum", lag($"row_sum",1).over(w_key))
   .filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
   .withColumn("LAYER_NO", rank.over(w_key)-1)
   .drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
|  KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1|       0| 200| 300| 400|   abc|
|key_1|       1| 200| 300| 600|   xyz|
|key_2|       0| 500| 700| 900|   prq|
|key_2|       1| 888| 555| 900|   tep|
|key_3|       0| 111| 222| 333|   lgh|
|key_3|       1| 084| 222| 333|   rrr|
+-----+--------+----+----+----+------+