0
votes

In Flink Event time mode, I use the periodic watermark to advance event time. Every slot extract event time from the incoming message and to emit watermark, subtract it a network delay, say 3000ms.

public Watermark getCurrentWatermark() {
            return new Watermark(MAX_TIMESTAMP - DELEY);
        }

I have 4 active slots. The problem is just two slots get incoming data but all of them call the method getCurrentWatermark(). So in this situation consider a case that thread 1 and 2 get incoming data and thread 3 and 4 will not.

Thread-1-watermark ---> 1541217659806
Thread-2-watermark ---> 1541217659810
Thread-3-watermark ---> (0 - 3000 = -3000)
Thread-4-watermark ---> (0 - 3000 = -3000) 

So as Flink set the lowest watermark as the general watermark, time doesn't go on! If I change the getCurrentWatermark() method as:

public Watermark getCurrentWatermark() {
            return new Watermark(System.currentTimeMillis() - DELEY);
        }

it will solve the problem, but I don't want to use machine's timestamp! How can I fix the problem?

1

1 Answers

0
votes

There was an idea of sending out Idle watermarks to indicate that a particular partition isn't receiving any data and windows shouldn't wait for it. I personally have never tried it out (and even some built in things like the Kafka connector haven't been updated to use it yet), but here's the ticket where it was implemented at least:

Idle Watermarks