1
votes

Currently in my kafka consumer i have turned off auto commit, so currently when processing of messages failed for ex: three invalid messages, the manual ack fails and the lag increases to three.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
orders 0 35 38 3

After that if a new incoming valid message comes through and the processing of that message is successfully completed, it is manually acked and after that consumer looks like this

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
orders 0 39 39 0

Why does consumer set the current-offset to 39 when the messages with offset 36, 37, 38 were not successfully processed and they are never read again by the same consumer

Can anyone pls explain this behavior? Thanks in advance!

1

1 Answers

1
votes

In Kafka, consumers don't ack every single messages. Instead they ack (commit) the offset of the last message they processed.

For example, if you commit offset 15, it implicitly means you've processed all messages before from 0 to 15. Also when committing 15, you overwrite any previous commit, so you cannot know if you committed 13 or 14 before.

I suggest you read the Consumer Position section in the docs that go over this concept.

Regarding reprocessing, Kafka offers a few options. When hitting a processing failure, before polling for more messages and processing new records, you can try to reprocess the message. Another option is to skip it as invalid and carry on (what you are currently doing).

On the other hand, you could ensure data is good by running a Streams job to pipe valid messages into a checked topic and forward bad messages to a DLQ. Then consume from this checked topic where you know you only have good messages. See validation for kafka topic messages