0
votes

I'm writing a kafka listener which should just forward the messages from a topic to a jms queue. I need to stop processing new messages only for a custom exception JmsBrokerConnectionException but i want to continue processing new messages for any other exceptions (ie invalid data) and send error messages to a DLT. I am using spring-kafka 2.2.7 and cannot upgrade it.

I have currently a solution which uses:

  • SeekToCurrentErrorHandler (configured with 0 retries and a DeadLetterPubishingRecoverer)
  • a retry template used in the @KafkaListener method configured with Integer.MAX_VALUE retries, which retries only for JmsBrokerConnectionException
  • MANUAL_IMMEDIATE ack

The solution seems to do the job but it has the drawback that, for long outages of the jms broker, it would cause a rebalance each max.poll.interval.ms (ie 5 mins).

The question: Is it a good idea to let max.poll.interval.ms expire and have a group rebalance to handle error conditions for which you want to stop message consumption?

I don't have high-throughput requirements. The input topic has 10 partitions and i will have 2 consumers. I know there are other solutions using stateful retry or pausing/resuming the container, but i'd like to keep using the current solution unless i am missing any major drawbacks about it.

1

1 Answers

0
votes

I am using spring-kafka 2.2.7 and cannot upgrade it.

That version is no longer supported.

Version 2.3 added backoff and exception classification to the STCEH, eliminating the need for a retry template at the listener level.

That said, you can use stateful retry (https://docs.spring.io/spring-kafka/docs/current/reference/html/#stateful-retry) with a STCEH that always retries, and do the dead letter publishing in the RecoveryCallback at the listener level. The consumer record is available in the retry context with the RetryingMessageListenerAdapter.CONTEXT_RECORD key.

Since you are doing manual acks, you will also need to commit the offset via the CONTEXT_CONSUMER key.