I need help understanding Kafka stream behavior when max.task.idle.ms is used in Kafka 2.2.
I have a KStream-KTable join where the KStream has been re-keyed:
KStream stream1 = builder.stream("topic1", Consumed.with(myTimeExtractor));
KStream stream2 = builder.stream("topic2", Consumed.with(myTimeExtractor));
KTable table = stream1
.groupByKey()
.aggregate(myInitializer, myAggregator, Materialized.as("myStore"))
stream2.selectKey((k,v)->v)
.through("rekeyedTopic")
.join(table, myValueJoiner)
.to("enrichedTopic");
All topics have 10 partitions and for testing, I've set max.task.idle.ms to 2 minutes. myTimeExtractor updates the event time of messages only if they are labelled "snapshot": Each snapshot message in stream1 gets its event time set to some constant T, messages in stream2 get their event time set to T+1.
There are 200 messages present in each of topic1 and in topic2 when I call KafkaStreams#start, all labelled "snapshot" and no message is added thereafter. I can see that within a second or so both myStore and rekeyedTopic get filled up. Since the event time of the messages in the table is lower than the event time of the messages in the stream my understanding (from reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization) is that I should see the result of the join (in enrichedTopic) shortly after myStore and rekeyedTopic are filled up. In fact I should be able to fill up rekeyedTopic first and as long as myStore gets filled up less than 2 minutes after that, the join should still produce the expected result.
This is not what happens. What happens is that myStore and rekeyedTopic get filled up within the first second or so, then nothing happens for 2 minutes and only then enrichedTopic gets filled with the expected messages.
I don't understand why there is a pause of 2 minutes before the enrichedTopic gets filled since everything is "ready" long before. What I am missing?
stream2.selectKey((k,v)->v).through("rekeyedTopic")
and a second one for the rest (you can verify viaTopology#describe()
. (2) timestamp synchronization happens on sub-topology/task level, but not globally. Hence, your first sub-topology with only one input "topic2" should not block at all, but process the data and write into "rekeyedTopic". The second sub-topology, should actually block initially, because "rekeyedTopic" is empty on startup. – Matthias J. Saxthrough()
should forward the timestamp unmodified (cf. issues.apache.org/jira/browse/KAFKA-4785). Not sure if you might hit issues.apache.org/jira/browse/KAFKA-7458 though? you can observe if processing was enforced via metrics, as describe in the KIP. Maybe, processing is enforce on the second topology first, loading data into the table, but block later when data from"rekeyedTopic"
is consumed? – Matthias J. Saxmax.task.idle.ms
parameter... – Matthias J. Sax