I have a consumer which consumes a message, does some heavy job and then acks.
@KafkaListener(topics = "${kafka.topic}", groupId = "group", containerFactory ="ContainerFactory")
public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {
try {
//Heavy Job
ack.acknowledge();
} catch (Exception e) {
log("Error in Kafka Consumer);
}
}
Now if there is an exception, it should go to the catch block and Acknowledgement should not happen and if acknowledgement did not happen it should go back to the queue and be processed again. But it is not happening. The offset updates and the next message is picked. I understand that there is a poll size of the consumer which enables it to pick more than one message at a time. But even if one message is not acked it should reprocess it instead of ignoring it and updating the offset.
Here is the Kafka Consumer Config
`Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);