I am trying to use KStream-KTable leftJoin to enrich the item from topic A with Topic B. Topic A is my KStream, and topic B is my KTtable which has around 23M records. The keys from both topics are not mathced, so I have to KStream(topic B) to KTable using reducer.
Here is my code:
KTable<String, String> ktable = streamsBuilder
.stream("TopicB", Consumed.withTimestampExtractor(new customTimestampsExtractor()))
.filter((key, value) -> {...})
.transform(new KeyTransformer()) // generate new key
.groupByKey()
.reduce((aggValue, newValue) -> {...});
streamBuilder
.stream("TopicA")
.filter((key, value) -> {...})
.transform(...)
.leftJoin(ktable, new ValueJoiner({...}))
.transform(...)
.to("result")
1) the KTable initialization is slow. (around 2000 msg/s), is this normal? My topic is only have 1 partition. Any way to improve the performance? I tried to set the following to reduec write throughput but seems doesn't improve a lot.
CACHE_MAX_BYTES_BUFFERING_CONFIG = 10 * 1024 * 1024
COMMIT_INTERVAL_MS_CONFIG = 15 * 1000
2) The join occurs when KTable is not finished loaded from Topic B. Here is the offset when join is occured (CURRENT-OFFSET/LOG-END-OFFSET)
Topic A: 32725/32726 (Lag 1)
Topic B: 1818686/23190390 (Lag 21371704)
I checked the timestamp of the record of Topic A that failed, it is a record of 4 days ago, and the last record of Topic B which is processed is 6 days ago. As my understanding, kstream process record based on timestamp, I don't understand why in my case, KStream(Topic A) didn't wait till KTable(Topic B) is fully loaded up to the point when it is 4 days ago to trigger the join.
I also tried setting timestamp extractor return 0, but it doesn't work as well.
Updated: When setting timestamp to 0, I am getting the following error:
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerID are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
I also tried setting max.task.idle.ms to > 0 (3 seconds and 30 minute), but still getting the same error.
Updated: I fixed the 'UnknownProducerIdException' error by setting the customTimestampsExtractor to 6 days ago which is still earlier than record from Topic A. I thhink (not sure) setting to 0 trigger retention on the changelog which caused this error. However, join is still not working where it still happen before the ktable finished loading. Why is that?
I am using Kafka Streams 2.3.0.
Am I doing anything wrong here? Many thanks.
max.task.idle.ms
already, I am not sure what the problem could be. Note though, that your table is not loaded directly from topic B: as you change the key, the data is repartitioned before thereduce
(similar for the stream input, ie, there is repartitioning happening before thejoin
()`) and this may cause issue with timestamp synchronization, too. The repartitioning will result in interleaved writes and thus out-of-order data. – Matthias J. Sax