0
votes

I am reading messages from a Kafka topic with 2 partitions and using event time. This is my code:

stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
    @Override
    public long extractAscendingTimestamp(Request req) {
        return req.ts;
    }
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
    collector.collect("Window: " + window.toString());
    for (Request req : iterable) {
        collector.collect(req.toString());
    }
})
.print()

I could get an output only when setting the Kafka source parallelism to 1. According to this thread, I guess that is because messages from multiple partitions arrive out-of-order to the timestamp extractor.

So I replaced the AscendingTimestampExtractor with a BoundedOutOfOrdernessGenerator as in this documentation example (with a higher maxOutOfOrderness delay), in order to handle out-of-order events, but I still can't get any output. Why is that?

1
How much data do you ingest (ie, over what event time period)? Keep in mind, that the first output will be produced after your 1-day time window gets closed, and this happens if the first watermark that is larger than "one day" arrives.Matthias J. Sax
Hi @MatthiasJ.Sax, sorry for the late response. The data I'm ingesting is over 1 year period, I'm actually aggregating historic data in a streaming fashion. It turns out the problem is that one of the topic partitions is empty, thus the watermark does not advance for it. I could fix it by setting topic.metadata.refresh.interval.ms in the producer config. Here is the related mailing list thread.Yassine Marzougui
I am also facing same problem in my flink application. I have created 4 partitions with parallelism 4 and I am getting out of order data. I have used BoundedOutOfOrdernessGenerator with delay of 3 seconds. Is there any way to achieve order in streaming based on event time stamp?Amol Suryawanshi

1 Answers

0
votes

Check that the event timestamp is OK. Must have a 13 length to be used in Flink Java Epoch.

Correct: 1563743505673

Incorrect: 1563743505

Ref.: https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps