1
votes

I'm trying to consume multiple topics from Kafka using the same @KafkaListener implementation, but I wanted to have one consumer for each topic (each one has just one partition). In order to achieve that I created a @KafkaListenerwith the topicPattern="topic1|topic2|topic3" and a ConcurrentKafkaListenerContainerFactory with concurrency 3 and a groupId consumer_group. The problem is that all topics are assigned to the same consumer and the other 2 consumers become idle as the following log shows:

21:49:13.140 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [] for group consumer_group 21:49:13.140 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [] for group consumer_group 21:49:13.141 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [topic1, topic2, topic3] for group consumer_group

How can I hint Spring Kafka to spread each topic to a different consumer?

Cheers

3

3 Answers

1
votes

Use a different GroupID per consumer. Since all 3 of your consumers are in the same group they will divide the partitions amongst themselves from a topic. Since each topic has 1 partition only one consumer can get it assigned.

Also do not include each topic in the subscribe call, just the one you want in each consumer.

Read more on KAFKAs website. They cover this pretty clearly.

Alternative: Keep same group ID but make each consumer subsribe to the topic it wants only

0
votes

The concurrent message listener container will create 3 consumers with the same group id; that is by design. To do what you want, you need 3x @KafkaListener, one for each topic; they can, of course, delegate to the same method.

@KafkaListener(topic = ...)
public void l1(...) {
    doListen(...);
}

@KafkaListener(topic = ...)
public void l2(...) {
    doListen(...);
}

@KafkaListener(topic = ...)
public void l3(...) {
    doListen(...);
}

private void doListen(...) {
    ...
}
0
votes

Try to change partition assignment strategy to round robin (or sticky). Since you are using spring:

props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());