0
votes

I am little bit confused about the wording about this subject in kafka documentation, so I want to ask here that I am interpreting the things correctly here or not?

So if I understand this correctly only way to scale a Kafka Stream application is to start a new instance of the application (or increase the number of stream threads in application ), this will insure that there will be more consumers under consumerGroup('application.id'), so I can scale my stream application up to number of partitions of the topic (actually what happens if my Stream Topology connected to several topics, lets say TopicA with 5 partition and topicB with 3 I joined the streams for TopicA and TopicB, I am guessing I can scale up to 3 instances/threads in this case).

Now lets say I have a topicA with 5 partitions and I started 3 instances of my application and if I configured KTable in my topology, every KTable will contain information from certain partition and I have to find out over Metadata on which instance(partition) my Key is, so what happens when I start a 4th instance, lets say the Key/Value that was KTable on instance3 can now go to KTable on instance4, isn't it? One side question how long will such rebalancing can be (I think it depends on Topic size, so let's say it will took 1 minute, will my application who is querying KTable will be unresponsive during this operation?)

A side question, will this mechanism works exactly same for 'streamBuilder.table(..)' and 'streambuilder.groupByKey(..).reduce(..)''?

And one final question, again a Topic with 5 partitions but instead of starting 3 instances of the application, I started one instance with 3 stream threads (num.stream.threads = 3) will I have again 3 KTable's representing 5 partitions, will it behave exactly the same as increasing the instances number if I change thread size 3 to 4.

Thx for answers..

1

1 Answers

4
votes

lets say TopicA with 5 partition and topicB with 3 I joined the streams for TopicA and TopicB, I am guessing I can scale up to 3 instances/threads in this case).

First of all, in order to join two topics, they should have same number of partitions. That's the key requirement for joining. If you have Topic A with 5 and Topic B with 3 partition, it will never perform join. (https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#join-co-partitioning-requirements)

what happens when I start a 4th instance,

Yes, Kafka streams will rebalance the workload across the instances based on the number of partitions. It will reassign the partitions and their tasks including the local state stores. Time depends on the topic size. If there is an end user querying ktable, you should prefer collecting metadata from all instances and perform look up.

will this mechanism works exactly same for 'streamBuilder.table(..)' and 'streambuilder.groupByKey(..).reduce(..)''?

Yeah, the same mechanism is followed whenever a ktable is created using any transformation.

again a Topic with 5 partitions but instead of starting 3 instances of the application, I started one instance with 3 stream threads (num.stream.threads = 3) will I have again 3 KTable's representing 5 partitions, will it behave exactly the same as increasing the instances number if I change thread size 3 to 4.

By default, Kafka Streams will break the topology into 5 tasks(=number of partitions).If the num.stream.threads is set to 3, those tasks will be distributed across the specified threads. Hence thread-1 can run 2 tasks, thread-2 can run next 2 tasks and thread-3 can run 1 task.
KTable state will be partitioned into 5 shards(=number of partitions) and one shard will map to one task. Hence each task will create a local store excluding to the itself. These Local store contains data of the corresponding shard. Despite of number of thread, you will get local-store equals to number of partitions.

Example :

After adding one more instance