In Flink Event time mode, I use the periodic watermark to advance event time. Every slot extract event time from the incoming message and to emit watermark, subtract it a network delay, say 3000ms.
public Watermark getCurrentWatermark() {
return new Watermark(MAX_TIMESTAMP - DELEY);
}
I have 4 active slots. The problem is just two slots get incoming data but all of them call the method getCurrentWatermark(). So in this situation consider a case that thread 1 and 2 get incoming data and thread 3 and 4 will not.
Thread-1-watermark ---> 1541217659806
Thread-2-watermark ---> 1541217659810
Thread-3-watermark ---> (0 - 3000 = -3000)
Thread-4-watermark ---> (0 - 3000 = -3000)
So as Flink set the lowest watermark as the general watermark, time doesn't go on! If I change the getCurrentWatermark() method as:
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - DELEY);
}
it will solve the problem, but I don't want to use machine's timestamp! How can I fix the problem?