0
votes

I am trying to use KStream-KTable leftJoin to enrich the item from topic A with Topic B. Topic A is my KStream, and topic B is my KTtable which has around 23M records. The keys from both topics are not mathced, so I have to KStream(topic B) to KTable using reducer.

Here is my code:

KTable<String, String> ktable = streamsBuilder
     .stream("TopicB", Consumed.withTimestampExtractor(new customTimestampsExtractor()))
     .filter((key, value) -> {...})
     .transform(new KeyTransformer()) // generate new key
     .groupByKey()
     .reduce((aggValue, newValue) -> {...});

streamBuilder
     .stream("TopicA")
     .filter((key, value) -> {...})
     .transform(...)
     .leftJoin(ktable, new ValueJoiner({...}))
     .transform(...)
     .to("result")

1) the KTable initialization is slow. (around 2000 msg/s), is this normal? My topic is only have 1 partition. Any way to improve the performance? I tried to set the following to reduec write throughput but seems doesn't improve a lot.

CACHE_MAX_BYTES_BUFFERING_CONFIG = 10 * 1024 * 1024
COMMIT_INTERVAL_MS_CONFIG = 15 * 1000

2) The join occurs when KTable is not finished loaded from Topic B. Here is the offset when join is occured (CURRENT-OFFSET/LOG-END-OFFSET)

   Topic A: 32725/32726 (Lag 1)
   Topic B: 1818686/23190390 (Lag 21371704)

I checked the timestamp of the record of Topic A that failed, it is a record of 4 days ago, and the last record of Topic B which is processed is 6 days ago. As my understanding, kstream process record based on timestamp, I don't understand why in my case, KStream(Topic A) didn't wait till KTable(Topic B) is fully loaded up to the point when it is 4 days ago to trigger the join.

I also tried setting timestamp extractor return 0, but it doesn't work as well.

Updated: When setting timestamp to 0, I am getting the following error:

Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerID are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. 

I also tried setting max.task.idle.ms to > 0 (3 seconds and 30 minute), but still getting the same error.

Updated: I fixed the 'UnknownProducerIdException' error by setting the customTimestampsExtractor to 6 days ago which is still earlier than record from Topic A. I thhink (not sure) setting to 0 trigger retention on the changelog which caused this error. However, join is still not working where it still happen before the ktable finished loading. Why is that?

I am using Kafka Streams 2.3.0.

Am I doing anything wrong here? Many thanks.

1
The issues with old timestamps is known: issues.apache.org/jira/browse/KAFKA-6817Matthias J. Sax
Hi Matthias, Thanks for Jira link. Regarding to my 2) issue, I read about your answer in stackoverflow.com/questions/57498201/…, and you said as 2.1.0, timestamps are synchronized strictly. However, In my testing, it doesn't behave like that. Join starts without waiting for ktable finished loading even all records in ktable topic is earlier than all records in kstream. Is this a known issue?user1086102
I am not aware of any issue -- as you use max.task.idle.ms already, I am not sure what the problem could be. Note though, that your table is not loaded directly from topic B: as you change the key, the data is repartitioned before the reduce (similar for the stream input, ie, there is repartitioning happening before the join()`) and this may cause issue with timestamp synchronization, too. The repartitioning will result in interleaved writes and thus out-of-order data.Matthias J. Sax
Did you ever figure this out?Avi Farada
I changed the logic to use GlobalKTable insteaduser1086102

1 Answers

1
votes

1.the KTable initialization is slow. (around 2000 msg/s), is this normal?

This depend on your network, and I think the limition is the consuming rate of TopicB, two config CACHE_MAX_BYTES_BUFFERING_CONFIG and COMMIT_INTERVAL_MS_CONFIG which you use is to choose the trade-off between how much output of KTable you want to produce (cause KTable changelog is stream of revisions) and how much latency you accept when you update KTable to underlying topic and downstream processor. Take a detail look at Kafka Streams caching config for state store and this blog part Tables, Not Triggers.

I think the good way to increase the consuming rate of TopicB is to add more partition.

  1. KStream.leftJoin(KTable,...) is always table lookup, it's always join the current stream record with the latest updated record on KTable, it'll not take stream time into account when deciding whether to join or not. If you want to consider stream time when joining, take a look at KStream-KStream join.

In your case this lag is the lag of TopicB, it does not mean KTable is not fully loaded. Your KTable is not fully loaded when it's in the state restore process when it's read from underlying changelog topic of KTable to restore the current state before actually running your stream app, in just case you will not able to do the join because stream app is not running until state is fully restore.