0
votes

I see that there are lot of discussions going on about adding support for watermarks per key. But do flink support per partition watermarks?

Currently - then minimum of all the watermarks(non idle partitions) is taken into account. Because of this the last hanging records in a window are stuck as well.(when incremented the watermark using periodicemit)

Any info on this is really appreciated!

1

1 Answers

1
votes

Some of the sources, such as the FlinkKafkaConsumer, support per-partition watermarking. You get this by calling assignTimestampsAndWatermarks on the source, rather than on the stream produced by the source.

What this does is that each consumer instance tracks the maximum timestamp within each partition, and take as its watermark the minimum of these maximums, less the configured bounded out-of-orderness. Idle partitions will be ignored, if you configure it to do so.

Not only does this yield more accurate watermarking, but if your events are in-order within each partition, this also makes it possible to take advantage of the WatermarkStrategy.forMonotonousTimestamps() strategy.

See Watermark Strategies and the Kafka Connector for more details.

As for why the last window isn't being triggered, this is related to watermarking, but not to per-partition watermarking. The problem is simply that windows are triggered by watermarks, and the watermarks are trailing behind the timestamps in the events. So the watermarks can never catch up to the final events, and can never trigger the last window.

This isn't a problem for unbounded streaming jobs, since they never stop and never have a last window. And it isn't a problem for batch jobs, since they are aware of all of the data. But for bounded streaming jobs, you need to do something to work around this issue. Broadly speaking, what you must do is to inform Flink that the input stream has ended -- whenever the Flink sources detect that they have reached the end of an event-time-based input stream, they emit one last watermark whose value is MAX_WATERMARK, and this will trigger any open windows.

One way to do this is to use a KafkaDeserializationSchema with an implementation of isEndOfStream that returns true when the job reaches its end.