0
votes

I’m a noob to KStreams and have a question about repartitioning in a KStreams application.

I’m having a hard time finding a definitive answer in online documentation and was wondering if any of you could shed some light.

The goal is to repartition a topic on a new key in an efficient way. In KStreams we can call map() and change the key and write it to the output topic.

How would this work for kstream app instances running in different JVMs running their own consumers in the same consumer group consuming from the original topic?

For example - Let’s say I have an input topic with 3 partitions that supplies employee data that has sick leave details for a month + year.

The key to this input topic is let’s say is null.

We want to flag if any Employee took more than 10 days of sick leave in a month. So we map the stream to have (EmployeeId, Month, Year) in the key and sick leave amount in the value. Then we aggregate this KStream into a KTable(persisted with an in-memory store) and convert that KTable back into a stream to get changing cumulations as a stream instead.

Finally we filter on the cumulations to flag if they become greater than 10.

Now the original input topic could have had sick leave details for the same employee on different partitions.

Does that mean that the aggregation for a single (EmployeeId, Month, Year) could get fractured across the different application instances and thus fail to fire the 10 day filter? (edited)

1

1 Answers

0
votes

KafkaStreams does not create output topics for you, but you should create the output topic before you start your application. Cf. https://docs.confluent.io/platform/current/streams/developer-guide/manage-topics.html#user-topics

If you don't create the output topic, KafkaStreams would fail eventually (as it cannot write to a non-existing topic), or if you have broker side auto-topic creation enable, the broker would create the topic for you (however, it's not recommended to use auto-topic creation).

Thus, you can create your output topic with any number of partitions, as suitable for your application.

If you run multiple instances of the same KafkaStreams application, all of them will write to the partitions of the output topic, ie, each instance can write to any partition, and every partitions might receive data from any instance.

If you call repartition() Kafka Streams will create a repartition topic on your behalf. In general, you don't need to call this method, because if you do a downstream aggregation or join, repartitioning happens automatically anyway. The purpose of repartition() is for case in which you need to repartition but Kafka Streams does not do it automatically, for example before a transform(). Another use case is, if you want to change the number of partitions for the repartition topic: by default it's create with the same number of partitions as the input topic.

If an internal repartition topic is created, your topology will look like this:

inputTopic -> map() -> repartitionTopic -> agg()/join() -> outputTopic