5
votes

I want to create a concurrent @KafkaListener which can handle multiple topics each with different number of partitions.

I have noticed that Spring-Kafka only initializes one consumer per partition for the topic with most partitions.

Example: I have set concurrency to 8. I got a @KafkaListener listening to the following topics. Topic A has the most partitions - 5, so Spring-Kafka initializes 5 consumers. I expected Spring-Kafka to initialize 8 consumers, which is the maximum allowed according to my concurrency property.

  • Topic A has 5 partitions
  • Topic B has 3 partitions
  • Topic C has 1

What is the technical reason for not initializing more consumers?

How do I bypass this, such that I can initialize more consumers using the @KafkaListener annotation? (if possible at all)

1

1 Answers

5
votes

When a listener is configured to listen to multiple topics, each consumer instance listens on all topics; Spring indeed starts 8 consumers (in this case), but the way those partitions are actually distributed across the consumers is controlled by Kafka's group management:

enter image description here

enter image description here

So you end up with 3 idle consumers in this case.

It might be possible to provide a custom partition.assignment.strategy to do the distribution the way you want, but I've never looked into that.

EDIT

I just tested with the RoundRobinAssignor...

spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

and...

enter image description here