I have a java Kafka consumer in which I am fetching ConsumerRecords in a batch to process. The sample code is as follows -
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
DoSomeProcessing (record.value());
}
consumer.commitAsync();
}
private void DoSomeProcessing(String record) {
//make an external call to a system which can take random time for different requests or timeout in 5 seconds.
}
The problem I have is for how or which offset to commit if the later record is produced but the previous record is still not timed out.
Lets suppose I get 2 records in a batch, the external call for 1st message is still awaited, and for 2nd call completed. If I wait for 5 seconds for the external response, the consumption from Kafka message can become super slow in cases. If I do not wait for 1st request to complete before doing another poll, what offset do I commit to Kafka? If I commit 2, and if the consumer crashes, 1st message will be lost as next time latest committed offset would be 2.