3
votes

I am having trouble understanding the concept of watermarks and allowed lateness.

Following is an excerpt from the [mail archive|https://www.mail-archive.com/[email protected]/msg08758.html] that talks about Watermarks but I have a couple of questions still. The following is the example quoted.:

Assume you have a BoundedOutOfOrdernessTimestampExtractor with a 2 minute bound and a 10 minute tumbling window that starts at 12:00 and ends at 12:10:

If you have the following stream sequence:

12:01, A
12:04, B
WM, 12:02 // 12:04 - 2 minutes
12:02, C
12:08, D
12:14, E
WM, 12:12
12:16, F
WM, 12:14 // 12:16 - 2 minutes
12:09, G

no allowed lateness

The window operator forwards the logical time to 12:12 when it receives <WM, 12:12> and evaluates the window which contains [A, B, C, D] at this time and finally purges its state. <12:09, G> is later ignored.

allowed lateness of 3 minutes

The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. The state is purged when <WM, 12:14> is received (window fire time 12:10 + 3mins allowed lateness). <12:09, G> is again ignored.

allowed lateness of 5 minutes

The window operator evaluates the window when <WM, 12:12> is received, but its state is not purged yet. When <12:09, G> is received, the window is again evaluated but this time with [A, B, C, D, G] and an update is sent out. The state is purged when a watermark of >= 12:15 is received.

As I understand:

  1. A watermark is supposed to tell that any element that arrives with an event timestamp less than that of the watermark will be dropped. So a watermark of 12:02 means that Flink has seen all that it had to see till event time 12:02. Any element having an event timesatamp lesser than this watermark e.g. 12:01 would be dropped.
  2. The concept of Allowed lateness applies only after the last watermark that marks the end of a window

My questions based on the understanding:

  1. How is message "12:02,C" being entertained considering Flink, with the previous watermark (WM, 12:02), has already said "I have seen everything till event time 12:02"?
  2. I have tweaked the stream sequence and inserted another record 12:01,CCC at a point as shown below in bold in the stream sequence.

If you have the following stream sequence:

12:01, A
12:04, B
WM, 12:02 // 12:04 - 2 minutes
12:02, C
 12:01, CCC // Inserted by Sheel
12:08, D
12:14, E
WM, 12:12
12:16, F
WM, 12:14 // 12:16 - 2 minutes
12:09, G

This is still in the 12:00-12:10 window but behind the watermark WM, 12:02. Lets say the allowed lateness is 5 mins. Will this record be accepted "somehow" bringing in the allowed lateness into picture or would this be dropped considering the watermark 12:02 has already crossed?

1

1 Answers

4
votes

The Watermarks control the lifetime of a window but not directly whether a record is dropped or not. When Flink's WindowOperator receives a new record, it will calculate the set of windows it falls into. If this set contains at least one active window, meaning that there is no watermark with a higher value than the window's end time + allowed lateness, the record will be assigned to this window and will become part of the window computation (even if the record's timestamp is lower than the last seen watermark). Hence, one could say that windows reduce the resolution of watermarks with respect to the individual records.

In your case, this means that both C and CCC will become part of the window 12:00 - 12:10 since the system hasn't seen a Watermark with >= 12:10, yet.