2
votes

I have a java Kafka consumer in which I am fetching ConsumerRecords in a batch to process. The sample code is as follows -

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for  (ConsumerRecord<String, String> record : records) {
        DoSomeProcessing (record.value());
    }
    consumer.commitAsync();
}

private void DoSomeProcessing(String record) {
    //make an external call to a system which can take random time for different requests or timeout in 5 seconds.
}

The problem I have is for how or which offset to commit if the later record is produced but the previous record is still not timed out.
Lets suppose I get 2 records in a batch, the external call for 1st message is still awaited, and for 2nd call completed. If I wait for 5 seconds for the external response, the consumption from Kafka message can become super slow in cases. If I do not wait for 1st request to complete before doing another poll, what offset do I commit to Kafka? If I commit 2, and if the consumer crashes, 1st message will be lost as next time latest committed offset would be 2.

2

2 Answers

2
votes

I think you analyzed the problem correctly, and the answer is probably what you suspect: you can't commit offsets until every offset less than and equal to that offset has been processed. That's just how Kafka works: it's very much oriented around strong ordering.

The solution is to increase the number of partitions and consumers so you get the parallelism you desire. This is not great from some angles—you needs more threads and resources—but at least you get to write synchronous code.

0
votes

What you can do is that you can setup an error pipeline. For the messages that are failing, you will commit that message and push it to the error queue and will process it later.