2
votes

I'm working on a Kafka streams application written in Kotlin, and I'm seeing some bizarre behavior with a join. At a high level, I'm streaming two topics with different keys. However, I can rekey one of the messages so that they keys line up. After I do this though, the subsequent join is not fired. Below I have supplied the simplified code (with irrelevant portions elided and replaced with comments)

val builder = KStreamBuilder()
val joinWindow = JoinWindows.of(/* 30 days */).until(/* 365 days */)

val topicOneStream = builder.stream<String, String>(topicOne)
val reKeyedTopicOneStream = topicOneStream.filter({ key, value ->
        // ... some logic to filter messages here
}).selectKey({ _, value ->
        // Rekey messages here based on the signature (which will match the key from corresponding messages on topicTwo)
        JSON.parseMessage(value)?.data?.authorization?.data?.signature
}).peek({ key, value -> logger.info("post-topicOne-filter $key, $value")})

val topicTwoStream = builder.stream<String, String>(topicTwo)
val filteredTopicTwoStream = topicTwoStream.filter({ key, value ->
        // ... other filtering logic goes here
        // these keys match those of the re-keyed messages from topic one
}).peek({ key, value -> logger.info("post-hello-sign event filter: $key, $value")})

val joinedStream = reKeyedTopicOneStream.join(filteredTopicTwoStream, { _, _  ->
    // Doesn't fire... sometimes
}, joinWindow)

When we run this, we see console output from the two peeks that indicate that the messages have been filtered and rekeyed as expected.

We also tried sending the output of the two streams to two similarly partitioned topics right before the join so that we could cat the topic and see what partitions the messages are being written to and with what keys. The results looked fine and are as follows:

Topic One:

Message with key 7e7f4e74-fc5e-4676-893a-353e4fb217f6 at partition 1 at offset 0

Topic Two:

Message with key 7e7f4e74-fc5e-4676-893a-353e4fb217f6 at partition 1 at offset 0

Is there something I'm missing here? My understanding is that messages with the same key should result in the join firing but that's not what I'm seeing. Thank you for your help!

Have you checked to make sure your record timestamps actually falls within the join window? You can do this with a process() at the end of each stream, like your peeks, but you'll be able to get the ProcessorContext and from that the timestamps.Dmitry Minkovsky
Thanks for the reply, I just checked the timestamps and they are within milliseconds of each other.davidicus
It appears the join was not firing because the messages that kafka-streams writes to the internal join topics were on different partitions. We think that the hashing functions between the kafka producer that is producing the message and kafka-streams differ. We're still looking into it, but for now we forced the other stream through a selectKey to ensure that the partitions were aligned. Specifically, from my example, after topicTwoStream.filter we added .selectKey({ key, _ -> key })davidicus
I didn't want to reply before, figuring you were still investigating, but yes, "We think that the hashing functions between the kafka producer that is producing the message and kafka-streams differ." would totally call this issue. And yes, selectKey() is a great simple workaround, given that Kafka Streams will automatically repartition if necessary (except before transform() and process()), using the same hashing algorithm it uses internally. Thanks for the update!Dmitry Minkovsky
You could also provide a custom partitioner if you want via .to(...) or .through(...).Matthias J. Sax