I'm using @KafkaListener and ConcurrentKafkaListenerContainerFactory to listen to 3 kafka topics and each topic has 10 partitions. I have few questions on how this works.
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(30);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
@KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
}
my listener.ackmode is return and enable.auto.commit is set to false and partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
1) my understanding about concurrency is, since i set my concurrency (at factory level) to 30, and i have a total of 30 partitions (for all three topic together) to read from, each thread will be assigned one partition. Is my understanding correct? how does it impact if i override concurrency again inside @KafkaListener annotation?
2) When spring call the poll() method, does it poll from all three topics?
3) since i set listener.ackmode is set to return, will it wait until all of the records that were returned in a single poll() to completed before issuing a next poll()? Also what happens if my records are taking longer than max.poll.interval.ms to process? Lets say 1-100 offsets are returned in a single poll() call and my code is only able to process 50 before max.poll.interval.ms is hit, will spring issue another poll at this time because it already hit max.poll.interval.ms? if so will the next poll() return records from offset 51?
really appreciate for your time and help