5
votes

I have a Kafka Streams application that is receiving data from topic-1 as KStream and topic-2 as KTable. Both topics have 4 partitions each. Let's say that I have 4 instances of the application running, then each instance will receive data from a single partition for topic-1. How about topic-2 which is received as KTable? Are all instances going to receive data from all 4 partitions in that case? If both the topics are keyed the same, then I guess Kafka Streams will ensure that the same partitions are allocated for an application. If topic-2 doesn't have any keys, but rather the application is going to infer that from the value itself, then that means that all the instances need to get all partitions from topic-2. How does Kafka Streams handle this situation?

Thank you!

1

1 Answers

4
votes

KTables are sharded according to the input partitions. Thus, similar to a KStream, each instance will get one topic-partition assigned and materialize this topic-partition as shard of the KTable. Kafka Streams make sure, that topic partitions of different topic are co-located, ie, one instance will get assigned topic-1 partition-0 and topic-2 partition-0 (and so forth).

If topic-2 has no key set, data will be randomly distributed in the topic. For this case, you can use a GlobalKTable instead. A GlobalKTable is a full replication of all partitions per instance. If you do a KStream-GlobalKTable-join, you can specify a "mapper" that extracts the join attribute from the table (ie, you can extract the join attribute from the value).

Note: a KStream-GlobalKTable join has different semantics than a KStream-KTable join. It is not time synchronized in contrast to the later, and thus, the join is non-deterministic by design with regard to GlobalKTable updates; i.e., there is no guarantee what KStream record will be the first to "see" a GlobalKTable updates and thus join with the updated GlobalKTable record.