0
votes

As I understood, the watermark is is last seen event time - late threshold. So if the last seen event time is 12:11 and the late threshold is 10 minutes the watermark is 12:01. Since 12:01 is later than the window start time of 12:00 it's state is dropped.

But I wrote query:

stream
  .withWatermark("created", "2 seconds")
  .groupBy(
    window($"created", "2 seconds", "2 seconds"),
    $"animal"
  )
  .count()
  .writeStream
  .format("console")
  .outputMode(OutputMode.Update())

And the output:

[2021-02-22 16:06:40.0,2021-02-22 16:06:42.0]:dog
[2021-02-22 16:06:40.0,2021-02-22 16:06:42.0]:owl
[2021-02-22 16:06:40.0,2021-02-22 16:06:42.0]:cat
[2021-02-22 16:06:34.0,2021-02-22 16:06:36.0]:pig

Last event time: 2021-02-22 16:06:41.696 in the window 40-42 sec

Pig time: 2021-02-22 16:06:35.696

Aa you can see, pig exist in window 34-36, but threshold is 2 seconds.

Why I can see pig int the output?

Interesting thing: if I push pig at the same time as other events but with the old timestamp, this event is added to the result set. But if the event is pushed after 2 seconds (threshold) with the same timestamp, it will not be shown in the result set.

1

1 Answers

0
votes

I push all data to the stream in one batch. And at that time, there is no watermark yet, that why I can see old event in the result set. If I push some data to the stream, set ProcessingTime, for example 100ms, and after 100ms will push old data, the result will be expected.