1
votes

I am currently using Spring Kafka to consume messages from topic along with @Retry of Spring. So basically, I am retrying to process the consumer message in case of an error. But while doing so, I want to avoid the exception message thrown by KafkaMessageListenerContainer. Instead I want to display a custom message. I tried adding an error handler in the ConcurrentKafkaListenerContainerFactory but on doing so, my retry does not get invoked. Can someone guide me on how to display a custom exception message along with @Retry scenario as well? Below are my code snippets:

ConcurrentKafkaListenerContainerFactory Bean Config


@Bean
ConcurrentKafkaListenerContainerFactory << ? , ? > concurrentKafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory < Object, Object > kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory < Object, Object > kafkaListenerContainerFactory =
        new ConcurrentKafkaListenerContainerFactory < > ();
    configurer.configure(kafkaListenerContainerFactory, kafkaConsumerFactory);
    kafkaListenerContainerFactory.setConcurrency(1);

    // Set Error Handler
    /*kafkaListenerContainerFactory.setErrorHandler(((thrownException, data) -> {
        log.info("Retries exhausted);
    }));*/
    return kafkaListenerContainerFactory;
}

Kafka Consumer

@KafkaListener(
    topics = "${spring.kafka.reprocess-topic}",
    groupId = "${spring.kafka.consumer.group-id}",
    containerFactory = "concurrentKafkaListenerContainerFactory"
)
@Retryable(
    include = RestClientException.class,
    maxAttemptsExpression = "${spring.kafka.consumer.max-attempts}",
    backoff = @Backoff(delayExpression = "${spring.kafka.consumer.backoff-delay}")
)
public void onMessage(ConsumerRecord < String, String > consumerRecord) throws Exception {
    // Consume the record
    log.info("Consumed Record from topic : {} ", consumerRecord.topic());


    // process the record
    messageHandler.handleMessage(consumerRecord.value());
}

Below is the exception that I am getting: enter image description here

2

2 Answers

1
votes

You should not use @Retryable as well as the SeekToCurrentErrorHandler (which is now the default, since 2.5; so I presume you are using that version).

Instead, configure a custom SeekToCurrentErrorHandler with max attempts, back off, and retryable exceptions.

That error message is normal; it's logged by the container; it's logging level can be reduced from ERROR to INFO or DEBUG by setting the logLevel property on the SeekToCurrentErrorHandler. You can also add a custom recoverer to it, to log your custom message after the retries are exhausted.

0
votes

my event retry template,

@Bean(name = "eventRetryTemplate")
public RetryTemplate eventRetryTemplate() {
RetryTemplate template = new RetryTemplate();

ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
policyMap.put(NonRecoverableException.class, new NeverRetryPolicy());
policyMap.put(RecoverableException.class, new AlwaysRetryPolicy());
retryPolicy.setPolicyMap(policyMap);

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(backoffInitialInterval);
backOffPolicy.setMaxInterval(backoffMaxInterval);
backOffPolicy.setMultiplier(backoffMultiplier);

template.setRetryPolicy(retryPolicy);
template.setBackOffPolicy(backOffPolicy);
return template;
}

my kafka listener using the retry template,

@KafkaListener(
  groupId = "${kafka.consumer.group.id}",
  topics = "${kafka.consumer.topic}",
  containerFactory = "eventContainerFactory")
  public void eventListener(ConsumerRecord<String, String> 
  events,
  Acknowledgment acknowledgment) {
  eventRetryTemplate.execute(retryContext -> {
  retryContext.setAttribute(EVENT, "my-event");
  eventConsumer.consume(events, acknowledgment);
  return null;
  });
}

my kafka consumer properties,

private ConcurrentKafkaListenerContainerFactory<String, String> 
getConcurrentKafkaListenerContainerFactory(
  KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(Boolean.TRUE);
kafkaErrorEventHandler.setCommitRecovered(Boolean.TRUE);
factory.setErrorHandler(kafkaErrorEventHandler);
factory.setConcurrency(1); 
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(getEventConsumerFactory(kafkaProperties));
return factory;
}

kafka error event handler is my custom error handler that extends the SeekToCurrentErrorHandler and implements the handle error method some what like this.....

@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> 
records,
  Consumer<?, ?> consumer, MessageListenerContainer container) {

log.info("Non recoverable exception. Publishing event to Database");
super.handle(thrownException, records, consumer, container);

ConsumerRecord<String, String> consumerRecord = (ConsumerRecord<String, 
String>) records.get(0);

FailedEvent event = createFailedEvent(thrownException, consumerRecord);

failedEventService.insertFailedEvent(event);

log.info("Successfully Published eventId {} to Database...", 
event.getEventId());
}

here the failed event service is my custom class again which with put these failed events into a queryable relational DB (I chose this to be my DLQ).