
I am reading messages from a Kafka topic with 2 partitions and using event time. This is my code:

stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
    collector.collect("Window: " + window.toString());
    for (Request req : iterable) {

I could get an output only when setting the Kafka source parallelism to 1. According to this thread, I guess that is because messages from multiple partitions arrive out-of-order to the timestamp extractor.

So I replaced the AscendingTimestampExtractor with a BoundedOutOfOrdernessGenerator as in this documentation example (with a higher maxOutOfOrderness delay), in order to handle out-of-order events, but I still can't get any output. Why is that?

How much data do you ingest (ie, over what event time period)? Keep in mind, that the first output will be produced after your 1-day time window gets closed, and this happens if the first watermark that is larger than "one day" arrives.Matthias J. Sax
Hi @MatthiasJ.Sax, sorry for the late response. The data I'm ingesting is over 1 year period, I'm actually aggregating historic data in a streaming fashion. It turns out the problem is that one of the topic partitions is empty, thus the watermark does not advance for it. I could fix it by setting topic.metadata.refresh.interval.ms in the producer config. Here is the related mailing list thread.Yassine Marzougui
I am also facing same problem in my flink application. I have created 4 partitions with parallelism 4 and I am getting out of order data. I have used BoundedOutOfOrdernessGenerator with delay of 3 seconds. Is there any way to achieve order in streaming based on event time stamp?Amol Suryawanshi

1 Answers


Check that the event timestamp is OK. Must have a 13 length to be used in Flink Java Epoch.

Correct: 1563743505673

Incorrect: 1563743505

Ref.: https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps