3
votes

I have a dataframe where a value label is associated with (id, bin, date, hour):

+----------+----+-----+---+-------------------+
|      date|hour|   id|bin|              label|
+----------+----+-----+---+-------------------+
|2019_12_20|   8|    1|  0|  151.7050821002368|
|2019_12_20|   8|    1|  2| 101.13672140015788|
|2019_12_20|   8|    1|  3| 16.856120233359647|
...

I want to append multiple columns to this dataframe corresponding to the label at the same hour on the previous day, an hour ago on the previous day, etc. I know how to get the first with the lag function:

val dateWindow = Window.partitionBy($"id", $"bin").orderBy($"hour", $"date")
val expandedDf = data.withColumn("yesterdaySameHour", lag($"label", 1, 0.0).over(dateWindow))

However, I can't figure out how to get the value label at hour - 1 on the previous day. Is there a way to have a conditional lag where I can filter out hour that's larger than or equal to the current row's hour? If not, what's the proper way to do this?

Many thanks.

1
move date to partitionby.Lamanus

1 Answers

1
votes

You have to specify the Window function as per your purpose. You may need to use the lag function twice.

import org.apache.spark.sql.expressions.Window

val dW = Window.partitionBy("id", "bin", "hour").orderBy("date")
val hW = Window.partitionBy("id", "bin", "date").orderBy("hour")

df.withColumn("yesterdaySameHour", lag("label", 1, 0.0).over(dW))
  .withColumn("todayPreviousHour", lag("label", 1, 0.0).over(hW))
  .withColumn("yestedayPreviousHour", lag(lag("label", 1, 0.0).over(dW), 1, 0.0).over(hW))
  .orderBy("date", "hour", "bin")
  .show(false)

This will give you the result:

+----------+----+---+---+-----+-----------------+-----------------+--------------------+
|date      |hour|id |bin|label|yesterdaySameHour|todayPreviousHour|yestedayPreviousHour|
+----------+----+---+---+-----+-----------------+-----------------+--------------------+
|2019_12_19|7   |1  |0  |-1   |0                |0                |0                   |
|2019_12_19|7   |1  |2  |-2   |0                |0                |0                   |
|2019_12_19|7   |1  |3  |-3   |0                |0                |0                   |
|2019_12_19|8   |1  |0  |1    |0                |-1               |0                   |
|2019_12_19|8   |1  |2  |2    |0                |-2               |0                   |
|2019_12_19|8   |1  |3  |3    |0                |-3               |0                   |
|2019_12_20|7   |1  |0  |4    |-1               |0                |0                   |
|2019_12_20|7   |1  |2  |5    |-2               |0                |0                   |
|2019_12_20|7   |1  |3  |6    |-3               |0                |0                   |
|2019_12_20|8   |1  |0  |7    |1                |4                |-1                  |
|2019_12_20|8   |1  |2  |8    |2                |5                |-2                  |
|2019_12_20|8   |1  |3  |9    |3                |6                |-3                  |
+----------+----+---+---+-----+-----------------+-----------------+--------------------+