4
votes

I've looked through the documentation for spring-cloud-stream 1.0.0.RELEASE a bit and can't seem to find any documentation about error handling.

Based on observation with kafka 0.9, if my consumer throws a RuntimeException, I see 3 retries. After the 3 retries, I see this in the logs:

2016-05-17 09:35:59.216 ERROR 8983 --- [  kafka-binder-] o.s.i.k.listener.LoggingErrorHandler     : Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3731457175, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=130 cap=130]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='reservation', id=1]]

org.springframework.messaging.MessagingException: Exception thrown while invoking demo.sink.ReservationConsumer#handleReservation[1 args]; nested exception is java.lang.RuntimeException: no message

at this point, the consumer offset lags by 1, and if I restart the consumer, the message is retried again 3 times. However, if I then send another message to the same partition such that the consumer does not throw an exception, the consumer offset gets updated, and the original message that we threw an exception for is not going to be retried any longer after a restart.

Is this documented somewhere that I haven't found? Is error handling binder-specific, or is s-c-s abstracting that to be consistent across binders? I suspect that this is an unplanned consequence of how the consumer offsets are updated with the kafka binder. I see that an enableDlq kafka consumer property was added, and I'm about to test with that, but I'm not sure how we can deal with dead letters in kafka. I'm familiar with dead letter queues in rabbitmq, but with rabbitmq we're able to use the rabbitmq shovel plugin to republish and retry dlq messages, to cover cases where the failure was due to a temporary service outage. I'm not aware of any similar functionality available for kafka, short of writing a similar utility ourselves.

UPDATE: Testing with the enableDlq kafka consumer property enabled shows the same consumer offset issue with error handling. When the consumer throws a RuntimeException, I see 3 retries, after which the error message is not logged, and I see a message published to error.<destination>.<group> as documented, but the consumer offset is not updated and lags by 1. If I restart the consumer, it tries to process the same failed message from the original topic partition again, retries 3 times and puts the same message on the error.<destination>.<group> topic again (duplicate dlq message). If I publish another message to the same topic partition for which the consumer does not throw a RuntimeException, the offset gets updated and the original failed message is no longer retried on restart.

I think the consumer should be updating the consumer offset in kafka when the consumer throws an error, regardless of whether enableDlq is true or not. That would at least make it consistent that a message that failed all retry attempts is either discarded (when enableDlq is false) or published to the dlq and never retried (when enableDlq is true).

1

1 Answers

2
votes

Looks like a bug to me - the listener container has a property autoCommitOnError (false by default) which is not exposed (or set) by the binder. After calling the error handler (which publishes the error) if the boolean is true, the offset is committed.

Please report it as an issue in github.