I'm trying to consume multiple topics from Kafka using the same @KafkaListener implementation, but I wanted to have one consumer for each topic (each one has just one partition). In order to achieve that I created a @KafkaListener
with the topicPattern="topic1|topic2|topic3"
and a ConcurrentKafkaListenerContainerFactory with concurrency 3 and a groupId consumer_group
. The problem is that all topics are assigned to the same consumer and the other 2 consumers become idle as the following log shows:
21:49:13.140 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [] for group consumer_group
21:49:13.140 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [] for group consumer_group
21:49:13.141 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [topic1, topic2, topic3] for group consumer_group
How can I hint Spring Kafka to spread each topic to a different consumer?
Cheers