I'm working on a Kafka streams application written in Kotlin, and I'm seeing some bizarre behavior with a join. At a high level, I'm streaming two topics with different keys. However, I can rekey one of the messages so that they keys line up. After I do this though, the subsequent join is not fired. Below I have supplied the simplified code (with irrelevant portions elided and replaced with comments)
val builder = KStreamBuilder()
val joinWindow = JoinWindows.of(/* 30 days */).until(/* 365 days */)
val topicOneStream = builder.stream<String, String>(topicOne)
val reKeyedTopicOneStream = topicOneStream.filter({ key, value ->
// ... some logic to filter messages here
}).selectKey({ _, value ->
// Rekey messages here based on the signature (which will match the key from corresponding messages on topicTwo)
JSON.parseMessage(value)?.data?.authorization?.data?.signature
}).peek({ key, value -> logger.info("post-topicOne-filter $key, $value")})
val topicTwoStream = builder.stream<String, String>(topicTwo)
val filteredTopicTwoStream = topicTwoStream.filter({ key, value ->
// ... other filtering logic goes here
// these keys match those of the re-keyed messages from topic one
}).peek({ key, value -> logger.info("post-hello-sign event filter: $key, $value")})
val joinedStream = reKeyedTopicOneStream.join(filteredTopicTwoStream, { _, _ ->
// Doesn't fire... sometimes
}, joinWindow)
When we run this, we see console output from the two peeks that indicate that the messages have been filtered and rekeyed as expected.
We also tried sending the output of the two streams to two similarly partitioned topics right before the join so that we could cat the topic and see what partitions the messages are being written to and with what keys. The results looked fine and are as follows:
Topic One:
Message with key 7e7f4e74-fc5e-4676-893a-353e4fb217f6 at partition 1 at offset 0
Topic Two:
Message with key 7e7f4e74-fc5e-4676-893a-353e4fb217f6 at partition 1 at offset 0
Is there something I'm missing here? My understanding is that messages with the same key should result in the join firing but that's not what I'm seeing. Thank you for your help!
process()
at the end of each stream, like yourpeek
s, but you'll be able to get theProcessorContext
and from that the timestamps. – Dmitry MinkovskyselectKey
to ensure that the partitions were aligned. Specifically, from my example, aftertopicTwoStream.filter
we added.selectKey({ key, _ -> key })
– davidicusselectKey()
is a great simple workaround, given that Kafka Streams will automatically repartition if necessary (except beforetransform()
andprocess()
), using the same hashing algorithm it uses internally. Thanks for the update! – Dmitry Minkovsky.to(...)
or.through(...)
. – Matthias J. Sax