I am reading messages from a Kafka topic with 2 partitions and using event time. This is my code:
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Request>() {
@Override
public long extractAscendingTimestamp(Request req) {
return req.ts;
}
})
.windowAll(TumblingEventTimeWindows.of(Time.days(1)))
.apply((TimeWindow window, Iterable<Request> iterable, Collector<String> collector) -> {
collector.collect("Window: " + window.toString());
for (Request req : iterable) {
collector.collect(req.toString());
}
})
.print()
I could get an output only when setting the Kafka source parallelism to 1. According to this thread, I guess that is because messages from multiple partitions arrive out-of-order to the timestamp extractor.
So I replaced the AscendingTimestampExtractor
with a BoundedOutOfOrdernessGenerator
as in this documentation example (with a higher maxOutOfOrderness delay), in order to handle out-of-order events, but I still can't get any output. Why is that?