7
votes

I'm using Spark Structured Streaming to analyze sensor data and need to perform calculations based on a sensors previous timestamp. My incoming data stream has three columns: sensor_id, timestamp, and temp. I need to add a fourth column that is that sensors previous timestamp so that I can then calculate the time between data points for each sensor.

This is easy using traditional batch processing using a lag function and grouping by sensor_id. What is the best way to approach this in a streaming situation?

So for example if my streaming dataframe looked like this:

+----------+-----------+------+
| SensorId | Timestamp | Temp |
+----------+-----------+------+
|     1800 |        34 |   23 |
|      500 |        36 |   54 |
|     1800 |        45 |   23 |
|      500 |        60 |   54 |
|     1800 |        78 |   23 |
+----------+-----------+------+

I would like something like this:

+----------+-----------+------+---------+
| SensorId | Timestamp | Temp | Prev_ts |
+----------+-----------+------+---------+
|     1800 |        34 |   23 |      21 |
|      500 |        36 |   54 |      27 |
|     1800 |        45 |   23 |      34 |
|      500 |        60 |   54 |      36 |
|     1800 |        78 |   23 |      45 |
+----------+-----------+------+---------+

If I try

test = filteredData.withColumn("prev_ts", lag("ts").over(Window.partitionBy("sensor_id").orderBy("ts")))

I get an AnalysisException: 'Non-time-based windows are not supported on streaming DataFrames/Datasets

Could I save the previous timestamp of each sensor in a data structure that I could reference and then update with each new timestamp?

1
I took the GroupState suggestion and have been working off of this blog post but have yet to get anything working in a databricks notebook.. Please let me know if you figure something out!Matt S.
Hi Matt...Did you get the solution for this problem?Nagesh

1 Answers

-4
votes

There is no need to "simulate" anything. Standard window functions can be used with Structured Streaming.

s = spark.readStream.
   ...
   load()

s.withColumn("prev_ts", lag("Temp").over(
  Window.partitionBy("SensorId").orderBy("Timestamp")
)