I am using @KafkaListener with props as
max.poll.records to 50. (Each record takes 40-60 sec to process)
enable-auto-commit=false
ack-mode to manual immediate
Below is the logic
@KafkaListener(groupId=“ABC”, topic=“Data1” containerFactory=“myCustomContainerFactory”)
public void listen(ConsumerRecord<String, Object> record, Acknowledge ack) {
try{
process(record);
ack.acknowledge();
}
Catch(e){
reprocess() // pause container and seek
}
}
Other props like max.poll.interval.ms, session.timeout.ms or heartbeat are of default values
I am not able to understand whats going wrong here,
Suppose if 500 msg are published to 2 partition
I am not sure why the consumer is not polling records as per max.poll.records prop actually its polls all 500 msg as soon as the application starts or msg are published by producer
Its observed that after processing some records say approx 5-7 mins consumer re reads an offset again.. which actually was read fine processed and acknowledged..
After a hour the log file shows that same messages are read multiple times.
Any help is appreciated Thanks.