0
votes

First off all, sorry if my terminology is not precise I am very new to kafka and i have read as most as i could. We have a service which uses kafkastreams, kafka version: 2.3.1. The stream app has a stream topology which reads from a "topicA", performs a convertion and publishes into another topic "topicB" which then is consumed by another stream of the topology and aggregates it using a Ktable ( localstore ). A listener publishes the ktable changes into another topic.

The topics has 24 partitions. We have 2 instances of this service in different machines with 4 stream threads each. The problem is, the partitions that use local store are assigned all to the same instance. Hence the disk usage, rebalance, performance is awful.

Also something unexpected to me, if I check the group assignments on the Kafka broker i see: (Removed other partitions for readability )

GROUP                             CONSUMER-ID                                                                                                                         HOST                        CLIENT-ID                                                                                      #PARTITIONS     ASSIGNMENT
fj.TheAggregation.TST.V1.PERF fj.TheAggregation.TST.V1.PERF-6898e899-7722-421a-8841-f8e45b074981-StreamThread-3-consumer-c089baaa-343b-484f-add6-aca12572e2a5 10.11.200.115/10.11.200.115 fj.TheAggregation.TST.V1.PERF-6898e899-7722-421a-8841-f8e45b074981-StreamThread-3-consumer 54              fj.TheAggregationDocument.TST.V1.PERF(4,8,12,16,20)
fj.TheAggregation.TST.V1.PERF fj.TheAggregation.TST.V1.PERF-6898e899-7722-421a-8841-f8e45b074981-StreamThread-2-consumer-f5e2d4e3-feee-4778-8ab8-ec4dd770541a 10.11.200.115/10.11.200.115 fj.TheAggregation.TST.V1.PERF-6898e899-7722-421a-8841-f8e45b074981-StreamThread-2-consumer 54              fj.TheAggregationDocument.TST.V1.PERF(5,9,13,17,21)
fj.TheAggregation.TST.V1.PERF fj.TheAggregation.TST.V1.PERF-0733344b-bd8d-40d6-ad07-4fc93de76cf2-StreamThread-4-consumer-63371f35-118a-44e0-bc9b-d403fb59384d 10.11.200.114/10.11.200.114 fj.TheAggregation.TST.V1.PERF-0733344b-bd8d-40d6-ad07-4fc93de76cf2-StreamThread-4-consumer 54              fj.TheAggregationDocument.TST.V1.PERF(2)
fj.TheAggregation.TST.V1.PERF fj.TheAggregation.TST.V1.PERF-0733344b-bd8d-40d6-ad07-4fc93de76cf2-StreamThread-1-consumer-714f0fee-b001-4b16-8b5b-6ab8935becfd 10.11.200.114/10.11.200.114 fj.TheAggregation.TST.V1.PERF-0733344b-bd8d-40d6-ad07-4fc93de76cf2-StreamThread-1-consumer 54              fj.TheAggregationDocument.TST.V1.PERF(0)
fj.TheAggregation.TST.V1.PERF fj.TheAggregation.TST.V1.PERF-0733344b-bd8d-40d6-ad07-4fc93de76cf2-StreamThread-2-consumer-d14e2e20-9aad-4a20-a295-83621a76b099 10.11.200.114/10.11.200.114 fj.TheAggregation.TST.V1.PERF-0733344b-bd8d-40d6-ad07-4fc93de76cf2-StreamThread-2-consumer 54              fj.TheAggregationDocument.TST.V1.PERF(1)
fj.TheAggregation.TST.V1.PERF fj.TheAggregation.TST.V1.PERF-6898e899-7722-421a-8841-f8e45b074981-StreamThread-4-consumer-14f390d9-f4f4-4e70-8e8d-62a79427c4e6 10.11.200.115/10.11.200.115 fj.TheAggregation.TST.V1.PERF-6898e899-7722-421a-8841-f8e45b074981-StreamThread-4-consumer 54              fj.TheAggregationDocument.TST.V1.PERF(7,11,15,19,23)
fj.TheAggregation.TST.V1.PERF fj.TheAggregation.TST.V1.PERF-6898e899-7722-421a-8841-f8e45b074981-StreamThread-1-consumer-57d2f85b-50f8-4649-8080-bbaaa6ea500f 10.11.200.115/10.11.200.115 fj.TheAggregation.TST.V1.PERF-6898e899-7722-421a-8841-f8e45b074981-StreamThread-1-consumer 54              fj.TheAggregationDocument.TST.V1.PERF(6,10,14,18,22)
fj.TheAggregation.TST.V1.PERF fj.TheAggregation.TST.V1.PERF-0733344b-bd8d-40d6-ad07-4fc93de76cf2-StreamThread-3-consumer-184f3a99-1159-44d7-84c6-e7aa70c484c0 10.11.200.114/10.11.200.114 fj.TheAggregation.TST.V1.PERF-0733344b-bd8d-40d6-ad07-4fc93de76cf2-StreamThread-3-consumer 54              fj.TheAggregationDocument.TST.V1.PERF(3)

so each stream service has 54 partitions assigned in total, however they are not evenly assigned. Also if i check the local store on each instance i see that the stream ktable are all in the same node, even though the broker states that some of the partition's are assigned to another instance. So the data provided by the broker does not seem to match the streamapp state.

Is there a way to ensure that GroupLeader assigns partitions evenly? I would expect to have some way to specify that or assign some kind of "weight" to each stream so the GroupLeader is able to distribute resources intensive streams evenly among the service instances or at least not so unbalanced. Btw, is there some kafka users group recommended to ask this kind of things? Thanks

1

1 Answers

1
votes

There was a lot of improvements to the streams assignor in 2.6 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams) you can read about them here.

I don't know if they will fix your problem but it should help. It does treat stateful task like ktables differently and should load them better.

If you cannot upgrade from 2.3.1 you might try different names. You might just be getting unlucky hashes.