0
votes

From the section "Watermarks in Parallel Streams" in the following URL,we know that " operator’s current event time is the minimum of its input streams’ event times" https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/event_time.html

Now that us take the event time of window(1) instance as an example, and we know the event time is 14(min(29,14)) , however what happens if the following sequence watermark events happens?

What's happens if the watermark event 29 arrives at the window(1) before watermark event 14?

Such as suppose the watermark event 29 arrives at the window(1) instance firstly, as the watermark 14 event hasn't arrived it ,so the event time of window(1) instance was set to 29 firstly , after that suppose the watermark 14 event arrived window(1) instance also , then the event time of window(1) instance was set to 14? (If that's the case, then event time of window(1) would change from 29 to 14, become smaller) ,also suppose after that the source(2) generated a watermark 39, and then arrived at window(1) instance, then the event time of window(1) instance would be set 29 or 39?

3

3 Answers

2
votes

Finally I also got the answer from the source, just as David said "the watermark for the window will stay at Long.MIN_VALUE until a larger value has arrived from both input streams."

https://github.com/apache/flink/blob/57b950796deebed46ae95f97152e09b2e2655de8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

public void processWatermark1(Watermark mark) throws Exception {
input1Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
  combinedWatermark = newMin;
  processWatermark(new Watermark(combinedWatermark));
}
}

public void processWatermark2(Watermark mark) throws Exception {
input2Watermark = mark.getTimestamp();
long newMin = Math.min(input1Watermark, input2Watermark);
if (newMin > combinedWatermark) {
  combinedWatermark = newMin;
  processWatermark(new Watermark(combinedWatermark));
}
}
1
votes
Such as suppose the watermark event 29 arrives at the window(1) instance 
firstly, as the watermark 14 event hasn't arrived it, so the event time of 
window(1) instance was set to 29 firstly ...

This isn't correct. A placeholder value of Long.MIN_VALUE is used until the first proper watermark arrives. So the watermark for the window will stay at Long.MIN_VALUE until a larger value has arrived from both input streams.

0
votes

Short answer is that no, the window will not get smaller in that case (and may in fact throw an exception).

This is where the BoundedOutOfOrderness watermark extractor comes into play. Using that you configure how "out of order" timestamps might be, and it will smooth out those discrepancies. By default with an AscendingTimestamp extractor, it would in fact be an error to receive timestamps that come out of order.

Additionally there is the notion of "allowed lateness", which defines what happens in the case where you receive timestamps that are lower than the current watermark.

For example, if you know that your source of data might have 60 seconds of jitter (due to processing time delays, geographic distance, etc), you can use the bounded out-of-order extractor with a value of (TimeUnit.SECONDS, 60) which will effectively shift your whole window back 60s. This will allow elements to come in any order within 60s of each other.

However if you actually expect elements to come in either exactly in order or with very small jitter, but you want to accept late elements for processing, you can make use of the Allowed Lateness settings to define how your process should behave when those elements come in. By default, Flink will simply drop them, but you can configure a time period for which Flink will re-fire your window for each element that comes in.

Fundamentally all of this just depends on your particular situation and how jittery and late you expect data to come in, as well as how you deal with late elements. Flink pretty much allows for any combination of setup here.