TL;DR: What is the current best solution for guarantee the event-time order of events in Flink?
I use Flink 1.8.0 with Kafka 2.2.1. I need to guarantee of correct order of events by event timestamp. I generate periodic watermarks every 1s. I use FlinkKafkaConsumer with AscendingTimestampExtractor:
val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
override def extractAscendingTimestamp(element: T): Long =
timestampExtractor(element)
})
.addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)
and then processing:
myStream
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)
I realized, that for unordered events, that came in the same ms or a few ms later, the order is not corrected by Flink. What I found in the docs:
the watermark triggers computation of all windows where the maximum timestamp (which is end-timestamp - 1) is smaller than the new watermark
So that I prepared additional step of processing to guarantee the event-time order:
myStream
.timeWindowAll(Time.milliseconds(100))
.apply((window, input, out: Collector[MyEvent]) => input
.toList.sortBy(_.getTimestamp)
.foreach(out.collect) // this windowing guarantee correct order by event time
)(TypeInformation.of(classOf[MyEvent]))
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)
However, I find this solution ugly and it looks like a workaround. I am also concerned about per-partition watermarks of KafkaSource
Ideally I would like to put the guarantee of order in the KafkaSource and keep it for each kafka partition, like per-partition watermarks. Is it possible to do so? What is the current best solution for guarantee the event-time order of events in Flink?