There is a Kafka topic to which messages arrive. I need to read a message, process it and proceed to the next message. Message processing can fail, and if it happens, the processing has to be retried a few times (let's say, 10 times) before I can move on to the next message. If the processing fails for 10 times, the message needs to be dropped and we should continue with the next message.
We use reactor-kafka
, all the processing needs to be reactive.
Here is how I tried to solve this:
Flux.defer(receiver::receive)
.concatMap(this::processRecord)
.retryBackoff(10, ofMillis(500))
.concatMap(record -> record.receiverOffset().commit())
.subscribe();
(here receiver
is a KafkaReceiver<String, String>
).
This works for the case without any exceptions, and if there is an exception, processRecord()
is retried 10 times. The problem here is that if it still fails after 10 allowed attempts, the offset is not committed (of course), so next time the same offset is read from Kafka, so, effectively, the processing gets stuck forever on the 'faulty' offset.
I tried to implement the following obvious idea: if an exception 'passes further' than the retryBackoff()
operator, commit the current offset. To commit an offset, we need a ReceiverRecord
, so I adding wrapping of an exception in ExceptionWithRecord
together with the current record:
// in processRecord()
.onErrorMap(ex -> new ExceptionWithRecord(record, ex))
and
Flux.defer(receiver::receive)
.concatMap(this::processRecord)
.retryBackoff(10, ofMillis(500))
.concatMap(record -> record.receiverOffset().commit())
.onErrorResume(this::extractRecordAndMaybeCommit)
.subscribe();
extractRecordAndMaybeCommit()
extracts the ReceiverRecord
from the given exception and commits it:
return record.receiverOffset().commit();
This method of passing a record and later committing it if the retries are exhausted works, and the .commit()
method is called. but it has no effect.
It turns out that, as seen as any exception enters the reactive pipeline above, DefaultKafkaReceiver.dispose()
is called, so any subsequent commit attempt is ignored. So it turns out that it is simply not possible to commit an offset as soon as any exception gets 'seen' by the publishers.
How can be 'commit after N errors' behavior implemented while still using reactor-kafka
?