0
votes

I think my perception of Flink windows may be wrong, since they are not evaluated as I would expect from the documentation or the Flink book. The goal is to join a Kafka topic, which has rather static data, with a Kafka topic with constantly incoming data.

env.addSource(createKafkaConsumer())
        .join(env.addSource((createKafkaConsumer()))))
        .where(keySelector())
        .equalTo(keySelector())
        .window(TumblingProcessingTimeWindows.of(Time.hours(2)))
        .apply(new RichJoinFunction<A, B>() { ... }

createKafkaConsumer() returns a FlinkKafkaConsumer

keySelector() is a placeholder for my key selector.

KafkaTopic A has 1 record, KafkaTopic B has 5. My understanding would be, that the JoinFunction is triggered 5 times (join condition is valid each time), resulting in 5 records in the sink. If a new record for topic A comes in within the 2 hours, another 5 records would be created (2x5 records). However, what comes through in the sink is rather unpredictable, I could not see a pattern. Sometimes there's nothing, sometimes the initial records, but if I send additional messages, they are not being processed by the join with prior records.

My key question:

What does even happen here? Are the records emitted after the window is done processing? I would expect a real-time output to the sink, but that would explain a lot.

Related to that: Could I handle this problem with onElement trigger or would this make my TimeWindow obsolete? Do those two concepts exists parallel to each other, i.e. that the join window is 2 hours, but the join function + output is triggered per element? How about duplicates in that case?

Subsequently, does processing time mean the point in time, when the record is consumed from the topic? So if I e.g. setStartFromEarliest() on start, all messages which were consumed within the next two hours, were in that window?

Additional info: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); is set and I also switched to EventTime in between.

1
I really don't get it, I started a new job, set the Window to 2 minutes, set both Consumers to read from beginning (each topic had 10 records) and everything comes out of the join as it should, before the job was even running 2 minutes.. 2 hours on the contrary did not, however it showed the window having 20 records received. I cannot make sense out of that, the Logs do not show any error and the jobs do not fail.kopaka

1 Answers

1
votes

The semantics of a tumbling processing time window is that it processes all events which fall into the given timespan. In your case, it is 2 hours. Per default, the window will only output results once the 2 hours are over because it needs to know that no other events will be coming for this window.

If you want to output early results (e.g. for every incoming record), then you could specify a custom Trigger which fires on every element. See the Trigger API docs for more information about this.

Update

The window time does not start with the first element but the window starts at multiples of the window length. For example, if your window size is 2 hours, then you can only have windows [0, 2), [2, 4), ... but not [1, 3), [3, 5).