I'm using Spark Structured Streaming to analyze sensor data and need to perform calculations based on a sensors previous timestamp. My incoming data stream has three columns: sensor_id, timestamp, and temp. I need to add a fourth column that is that sensors previous timestamp so that I can then calculate the time between data points for each sensor.
This is easy using traditional batch processing using a lag function and grouping by sensor_id. What is the best way to approach this in a streaming situation?
So for example if my streaming dataframe looked like this:
+----------+-----------+------+
| SensorId | Timestamp | Temp |
+----------+-----------+------+
| 1800 | 34 | 23 |
| 500 | 36 | 54 |
| 1800 | 45 | 23 |
| 500 | 60 | 54 |
| 1800 | 78 | 23 |
+----------+-----------+------+
I would like something like this:
+----------+-----------+------+---------+
| SensorId | Timestamp | Temp | Prev_ts |
+----------+-----------+------+---------+
| 1800 | 34 | 23 | 21 |
| 500 | 36 | 54 | 27 |
| 1800 | 45 | 23 | 34 |
| 500 | 60 | 54 | 36 |
| 1800 | 78 | 23 | 45 |
+----------+-----------+------+---------+
If I try
test = filteredData.withColumn("prev_ts", lag("ts").over(Window.partitionBy("sensor_id").orderBy("ts")))
I get an AnalysisException: 'Non-time-based windows are not supported on streaming DataFrames/Datasets
Could I save the previous timestamp of each sensor in a data structure that I could reference and then update with each new timestamp?