1
votes

My understanding is that kafka streams supports partitioning. I am wondering how that works when joining data from two different topics? I assume that in order to join data based on two different topics, the client app must some how guarantee that the messages it gets from both topics share the same key. Just wondering how kafka streams does this?

1

1 Answers

4
votes

There are couple of pre-requisets to be able to do stream-stream , ktable-ktable or stream-ktable joins;

  • Topics need to be co-partitioned. Meaning they have to have same number of partitions. This requisite is actually a hard one and streams API won't allow joining if topics are not co-partitioned and will throw TopologyBuilderException at runtime when partitions are about to be assigned.

Other than this requirement any join will work but in order it to be work correctly a number of additional requirements must be met, such as;

  • Both topics should use the same key schema. For example if one topic uses userName as key and other userSurname joining operation will work but most probably won't produce any meaningful output.
  • Producer applications that are writing to joined topics should use the same partitioning strategy. That way same keys will end up at the same partitions that are assigned to be joined.
  • Both topics should use same message timestamp strategy(logAppendTime or CreteTime). This one is not a requirement per say but should be considered for windowed joins if topics use different messageTimeStampTypes since messageTimeStamps are used for determining relevant messages to join together and missing this can lead to hard to find bugs.

GlobalKTable joins don't have any of this requirements and will work with every topic regardless of partition count, partitioning strategy vs because all data for globalKTable will be presented to every single streams instance.

When messages are produced they will be sent to partitions based on their key and partitioning strategy, streams API assigns same topics partitions from each topic to the same processor so that all relevant messages from same topic having same key will be processed in same processor. For windowed joins message timestamps are considered to find messages to join for this particular window and emit the result once the join is done.