2
votes

I am working with spring boot + spring @KafkaListener. And the behavior I expect is: my kafka listener reads messages in 10 threads. So that, if one of threads hangs, other messages are would continue reading and handling messages.

I defined bean of

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory)
{

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setMissingTopicsFatal(false);
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    return factory;
}

And spring boot config:

spring.kafka.listener.concurrency=10

I see that all configs work, I see my 10 threads in jmx:

enter image description here

But then I make such test:

 @KafkaListener(topics = {
            "${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record)
    {
        if(record.getVersion() < 3) {
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        else
            System.out.println("It works!");

    }

If version is < 3, then hang, otherwise - work. I send 3 messages with version 1,2 and 3. I expect that messages with version 1 and 2 will hang, but version 3 will be processed at the time it comes to listener. But unfortunately message with version 3 waits for messages 1 and 2 before starts its processing.

Maybe my expectations are not true and this is a right behavior of kafka listener. Please help me to deal with kafka concurrency, why does it act like that?

2
Are you getting record versions in a sequence order that you have expected ? Or it's different - Anish B.
yes, I am sending it one by one: 1, 2, 3 - Sviatlana
Each @KafkaListener will have its own consumer. You cannot have more consumers with the same group id than partitions for your topic. If you have 10 @KafkaListener sharing the same group id consuming from a topic with 1 partition, then 9 of the @KafkaListener will be idle doing nothing. - kkflf
How many partitions does your topic have and which order do you produce the messages? - kkflf

2 Answers

11
votes

Kafka doesn't work that way; you need at least as many partitions as consumers (controlled by concurrency in the spring container).

Also, only one consumer (in a group) can consume from a partition at a time so, even if you increase the partitions, records in the same partition behind the "stuck" consumer will not be received by other consumers.

3
votes

If you want to have failover Kafka, you must spin up more instances of your application.

Example: you have a topic named test with 1 partition, you will create 2 instances of your app with the same Kafka group. One instance will process your data, the other will wait and start processing messages in case the first instance crashes. Same if you have N partitions with N + 1 or 2 or 3 instances of your application. Also, every instance will only have one consumer thread.

For more info about it search on Google: Kafka Consumer Groups.