Incoming data is stream like below consist of 3 columns
[
system -> deviceId,
time -> eventTime
value -> some metric
]
+-------+-------------------+-----+
|system |time |value|
+-------+-------------------+-----+
|system1|2019-08-20 07:13:10|1.5 |
|system2|2019-08-20 07:11:10|1.9 |
|system3|2019-08-20 07:13:15|1.3 |
|system1|2019-08-20 07:13:20|1.8 |
|system2|2019-08-20 07:11:20|1.6 |
|system3|2019-08-20 07:13:25|1.4 |
|system1|2019-08-20 07:13:30|1.2 |
|system2|2019-08-20 07:11:30|1.1 |
|system3|2019-08-20 07:13:35|1.5 |
+-------+-------------------+-----+
Each device produces data at fix interval of say [10 seconds],
I have spark structured streaming app which calculates max of value with
Window duration = 30 Seconds
Sliding duration = 30 Seconds
df.withWatermark("time", "30 seconds")
.groupBy(
window(col("time"), "30 seconds", "30 seconds"),
col("system")
)
.agg(max("value"))
Problem As each device is independent there clocks are also independent. The device can be choked and delayed the data sending because of various reasons ex: [Network Issue, High usage of the device, etc]
Now as its single job processing data it will start dropping the data of choked device based on the watermark and we are losing the data.
Is there any way or workaround so that watermark can be tied up with the deviceId. So that job maintains watermark per [deviceId EventTime] and process no dropped it because of other devices.