4
votes

I'm using @KafkaListener and ConcurrentKafkaListenerContainerFactory to listen to 3 kafka topics and each topic has 10 partitions. I have few questions on how this works.

    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(30);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
    @KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }

my listener.ackmode is return and enable.auto.commit is set to false and partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor

1) my understanding about concurrency is, since i set my concurrency (at factory level) to 30, and i have a total of 30 partitions (for all three topic together) to read from, each thread will be assigned one partition. Is my understanding correct? how does it impact if i override concurrency again inside @KafkaListener annotation?

2) When spring call the poll() method, does it poll from all three topics?

3) since i set listener.ackmode is set to return, will it wait until all of the records that were returned in a single poll() to completed before issuing a next poll()? Also what happens if my records are taking longer than max.poll.interval.ms to process? Lets say 1-100 offsets are returned in a single poll() call and my code is only able to process 50 before max.poll.interval.ms is hit, will spring issue another poll at this time because it already hit max.poll.interval.ms? if so will the next poll() return records from offset 51?

really appreciate for your time and help

1

1 Answers

4
votes

my listener.ackmode is return

There is no such ackmode; since you don't set it on the factory, your actual ack mode is BATCH (the default). To use ack mode record (if that's what you mean), you must so configure the factory container properties.

my understanding about concurrency is ...

Your understanding is incorrect; concurrency can not be greater than the number of partitions in the topic with the most partitions (if a listener listens to multiple topics). Since you only have 10 partitions in each topic, your actual concurrency is 10.

Overriding the concurrency on a listener simply overrides the factory setting; you always need at least as many partitions as concurrency.

When spring call the poll() method, does it poll from all three topics?

Not with that configuration; you have 3 concurrent containers, each with 30 consumers listening to one topic. You have 90 consumers.

If you have a single listener for all 3 topics, the poll will return records from all 3; but you still may have 20 idle consumers, depending on how the partition assignor allocates the partitions - see the logs "partitions assigned" for exactly how the partitions are allocated. The round robin assignor should distribute them ok.

will spring issue another poll at this time

Spring has no control - if you are taking too long, the Consumer thread is in the listener - the Consumer is not thread-safe so we can't issue an asynchronous poll.

You must process max.poll.records within max.poll.interval.ms to avoid Kafka from rebalancing the partitions.

The ack mode makes no difference; it's all about processing the results of the poll in a timely fashion.