0
votes

I have a requirement to consume from a kafka topic,do some work on records and produce to another topic with spring-kafka 2.1.7.Other requiremenrs are transactional for once only semantics,retry and error handling.On failure to commit a record I should do 3 retries ,log each of the retry messaage to retry topic anf on failure of all retries send the record to a dead letter topic . I looked at https://github.com/spring-projects/spring-kafka/issues/575 and it has excellent details on solving the problem. The thing that I am struggling with is how to log each of the retry message with details like consumer offset ,topic it was trying to commit,etc .Is there a way to get these from retry call back ?. The retrylistener snippet below is registered with a org.springframework.kafka.listener.LoggingErrorHandler that is set as container property to ConcurrentKafkaListenerContainerFactory ?

         @Bean
         public RetryListener retryListener(KafkaTemplate<String,SpecificRecord> kafkaTemplate) {
             return new RetryListenerSupport() {

                public void onError(RetryContext context, RetryCallback callback, Throwable throwable) {
                    int retryCount =context.getRetryCount();
                    kafkaTemplate) .send(new ProducerRecord<String,SpecificRecord>("topic_name",record));
                }
             };
         }
1

1 Answers

0
votes

The RetryContext is populated with some useful info in the RetryingMessageListenerAdapter:

context.setAttribute(CONTEXT_RECORD, record);
switch (RetryingMessageListenerAdapter.this.delegateType) {
    case ACKNOWLEDGING_CONSUMER_AWARE:
        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
        context.setAttribute(CONTEXT_CONSUMER, consumer);
        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
        break;
    case ACKNOWLEDGING:
        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
        break;
    case CONSUMER_AWARE:
        context.setAttribute(CONTEXT_CONSUMER, consumer);
        RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
        break;
    case SIMPLE:
        RetryingMessageListenerAdapter.this.delegate.onMessage(record);
}