1
votes

TL;DR: What is the current best solution for guarantee the event-time order of events in Flink?

I use Flink 1.8.0 with Kafka 2.2.1. I need to guarantee of correct order of events by event timestamp. I generate periodic watermarks every 1s. I use FlinkKafkaConsumer with AscendingTimestampExtractor:

val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
      override def extractAscendingTimestamp(element: T): Long =
        timestampExtractor(element)
      })
 .addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)

and then processing:

myStream
   .keyBy(ev => (ev.name, ev.group))
   .mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)

I realized, that for unordered events, that came in the same ms or a few ms later, the order is not corrected by Flink. What I found in the docs:

the watermark triggers computation of all windows where the maximum timestamp (which is end-timestamp - 1) is smaller than the new watermark

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows

So that I prepared additional step of processing to guarantee the event-time order:

myStream
      .timeWindowAll(Time.milliseconds(100))
      .apply((window, input, out: Collector[MyEvent]) => input
        .toList.sortBy(_.getTimestamp)
        .foreach(out.collect) // this windowing guarantee correct order by event time
      )(TypeInformation.of(classOf[MyEvent]))
      .keyBy(ev => (ev.name, ev.group))
      .mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)

However, I find this solution ugly and it looks like a workaround. I am also concerned about per-partition watermarks of KafkaSource

Ideally I would like to put the guarantee of order in the KafkaSource and keep it for each kafka partition, like per-partition watermarks. Is it possible to do so? What is the current best solution for guarantee the event-time order of events in Flink?

2

2 Answers

1
votes

This is a great point. Having a guarantee of order in the KafkaSource actually includes two parts.

  1. Guarantee of order among partitions in the same subtask.
  2. Guarantee of order among subtasks.

The first part has already been in progress in https://issues.apache.org/jira/browse/FLINK-12675. And the second part needs the support of sharing state between subtasks, which may need more discussion and detailed plan in the community.

Back to your question, I think keeping the order of events by setting a window to buffer data is the best solution at present.

3
votes

Flink does not guarantee to process records in event-time order. Records within a partition will be processed in their original order but when two or more partitions are merged into a new partition (due to a repartitioning or union of streams), Flink randomly merged the records of those partitions into the new partition. Everything else would be inefficient and result in higher latencies.

For example if your job has a source task that reads from two Kafka partitions, the records of both partitions are merged in a somewhat random zigzag pattern.

However, Flink guarantees that all events are correctly processed with respect to the generated watermarks. This means, that a watermark never overtakes a record. For example if your Kafka source generates per-partition watermarks, the watermarks are still valid even after the records of multiple partitions were merged. A watermark is used to collect and process all records that have a timestamp of less than the watermark. Hence, it ensures completeness of input data.

This is a prerequisite to order the records by their timestamp. You can do that with a tumbling window all. However, you should be aware that

  1. a window all will be executed in a single task (i.e., it is not parallelized). If order per key is sufficient, you should use a regular tumbling window or even better implement a KeyedProcessFunction, which is more efficient.
  2. the order will be destroyed when the stream is reorganized due to repartitioning or changing the parallelism.