0
votes

I need help understanding Kafka stream behavior when max.task.idle.ms is used in Kafka 2.2.

I have a KStream-KTable join where the KStream has been re-keyed:

KStream stream1 = builder.stream("topic1", Consumed.with(myTimeExtractor));
KStream stream2 = builder.stream("topic2", Consumed.with(myTimeExtractor));

KTable table = stream1
       .groupByKey()
       .aggregate(myInitializer, myAggregator, Materialized.as("myStore"))

stream2.selectKey((k,v)->v)
       .through("rekeyedTopic")
       .join(table, myValueJoiner)
       .to("enrichedTopic");

All topics have 10 partitions and for testing, I've set max.task.idle.ms to 2 minutes. myTimeExtractor updates the event time of messages only if they are labelled "snapshot": Each snapshot message in stream1 gets its event time set to some constant T, messages in stream2 get their event time set to T+1.

There are 200 messages present in each of topic1 and in topic2 when I call KafkaStreams#start, all labelled "snapshot" and no message is added thereafter. I can see that within a second or so both myStore and rekeyedTopic get filled up. Since the event time of the messages in the table is lower than the event time of the messages in the stream my understanding (from reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization) is that I should see the result of the join (in enrichedTopic) shortly after myStore and rekeyedTopic are filled up. In fact I should be able to fill up rekeyedTopic first and as long as myStore gets filled up less than 2 minutes after that, the join should still produce the expected result.

This is not what happens. What happens is that myStore and rekeyedTopic get filled up within the first second or so, then nothing happens for 2 minutes and only then enrichedTopic gets filled with the expected messages.

I don't understand why there is a pause of 2 minutes before the enrichedTopic gets filled since everything is "ready" long before. What I am missing?

1
Not 100% sure atm what's happening. (1) Your program should be split into two sub-topologies, one for stream2.selectKey((k,v)->v).through("rekeyedTopic") and a second one for the rest (you can verify via Topology#describe(). (2) timestamp synchronization happens on sub-topology/task level, but not globally. Hence, your first sub-topology with only one input "topic2" should not block at all, but process the data and write into "rekeyedTopic". The second sub-topology, should actually block initially, because "rekeyedTopic" is empty on startup.Matthias J. Sax
However, it should unblock until data is written into "rekeyedTopic". Also, through() should forward the timestamp unmodified (cf. issues.apache.org/jira/browse/KAFKA-4785). Not sure if you might hit issues.apache.org/jira/browse/KAFKA-7458 though? you can observe if processing was enforced via metrics, as describe in the KIP. Maybe, processing is enforce on the second topology first, loading data into the table, but block later when data from "rekeyedTopic" is consumed?Matthias J. Sax
1. You should have 2 sub-topologies: correct 2. Your first sub-topology should not block at all: correct 3. through() should forward the timestamp unmodified: correct 4. The second sub-topology should block initially ...: correct 5. ... and unblocks when data is written into rekeyedTopic: this is the problem The second topology only unblocks once max.task.idle.ms runs out.David Breton
Thanks for confirming. It might be a bug. Maybe it's best to file a Jira ticket for tracking. We are already aware of couple of issues with max.task.idle.ms parameter...Matthias J. Sax

1 Answers

0
votes

based on the documentation where it states:

max.task.idle.ms - Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.

I would say it's possibly due to some of the partition buffers NOT containing records so it's basically waiting to avoid out of order processing up to the defined time you have configured for the property.