4
votes

I'm having a hard time to understand how this could be solved so I'm asking it here in the hope someone else already faced the same problems. We are running our @KafkaListener with manual ack mode and a dead letter recoverer with a retry limit of 3. Manual ack mode is needed due to the business logic that we dont ack a message and pause consuming for 5 minutes when certain circumstances are given (external dependencies).

Also we do need the dead letter queue for messages we cannot process for some reason.

Now the problem in manual ack mode is that our listener/consumer does not acknowledge the message when he hits the retry limit and moves it to the dl queue.

If the consumer service will be restarted he tries to consume the messages again and again will move them to the dl queue.

Any ideas how we could solve this issue?

Thanks and greetings from Hamburg!

2

2 Answers

2
votes

I would try to avoid using manual acks if possible; perhaps by increasing max.poll.interval.ms instead.

If you use AckMode.MANUAL_IMMEDIATE, it will be safe to perform the commit directly on the Consumer in the error handler.

Subclass the SeekToCurrentErrorHandler and override handle(), If super.handle() doesn't throw an exception, it means the retries are exceeded and you can commit the offset on the Consumer.

1
votes

commitRecovered can be set to true on SeekToCurrentErrorHandler instance being provided to the ContainerListenerFactory.

Refer documentation here

public void setCommitRecovered(boolean commitRecovered)

Set to true to commit the offset for a recovered record. The container must be >configured with ContainerProperties.AckMode.MANUAL_IMMEDIATE. Whether or not the >commit is sync or async depends on the container's syncCommits property.