I think my perception of Flink windows may be wrong, since they are not evaluated as I would expect from the documentation or the Flink book. The goal is to join a Kafka topic, which has rather static data, with a Kafka topic with constantly incoming data.
env.addSource(createKafkaConsumer())
.join(env.addSource((createKafkaConsumer()))))
.where(keySelector())
.equalTo(keySelector())
.window(TumblingProcessingTimeWindows.of(Time.hours(2)))
.apply(new RichJoinFunction<A, B>() { ... }
createKafkaConsumer()
returns a FlinkKafkaConsumer
keySelector()
is a placeholder for my key selector.
KafkaTopic A has 1 record, KafkaTopic B has 5. My understanding would be, that the JoinFunction is triggered 5 times (join condition is valid each time), resulting in 5 records in the sink. If a new record for topic A comes in within the 2 hours, another 5 records would be created (2x5 records). However, what comes through in the sink is rather unpredictable, I could not see a pattern. Sometimes there's nothing, sometimes the initial records, but if I send additional messages, they are not being processed by the join with prior records.
My key question:
What does even happen here? Are the records emitted after the window is done processing? I would expect a real-time output to the sink, but that would explain a lot.
Related to that: Could I handle this problem with onElement trigger or would this make my TimeWindow obsolete? Do those two concepts exists parallel to each other, i.e. that the join window is 2 hours, but the join function + output is triggered per element? How about duplicates in that case?
Subsequently, does processing time mean the point in time, when the record is consumed from the topic? So if I e.g. setStartFromEarliest()
on start, all messages which were consumed within the next two hours, were in that window?
Additional info:
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
is set and I also switched to EventTime in between.