1
votes

I have an application using Spring Cloud Stream and Spring Kafka, which processes Avro messages. The application works fine, but now I'd like to add some error handling.

The Goal: I would like to catch deserialization exceptions, build a new object with the exception details + original Kafka message + custom context info, and push this object to a dedicated Kafka topic. Basically a DLQ, but the original message will be intercepted and decorated.

The Problem: While I can intercept the exception, I can't figure out how to acquire the original message from Kafka (TODO 1, below). I've been all through the data object returned in ConsumerAwareErrorHandler.handle and I don't see it there.

Below is the code I have:

@EnableBinding(EventStream.class)
@SpringBootApplication
@Slf4j
public class SpringcloudApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringcloudApplication.class, args);
    }

    /* Configure custom exception handler */
    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
        return (container, destination, group) -> {
            container.setErrorHandler(new ConsumerAwareErrorHandler() {
                @Override
                public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
                    log.info("Got error with data: {}", data);
                    // TODO 1 - How to get original message?
                    // TODO 2 - Send to dedicated (DLQ) topic
                }
            });
        };
    }

    @StreamListener(EventStream.INBOUND)
    public void consumeEvent(@Payload Message message) {
        log.info("Consuming event --> {}", message.toString());
        produceEvent(message);
    }

    @Autowired private EventStream eventStream;
    public Boolean produceEvent(Message message) {
        log.info("Producing event --> {}", message.toString());
        return eventStream
                .producer()
                .send(MessageBuilder.withPayload(message)
                    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                    .build());
    }

}

And the properties files:

spring:
  cloud:
    stream:
      default-binder: kafka
      default:
        consumer:
          useNativeEncoding: true
        producer:
          useNativeEncoding: true
      kafka:
        binder:
          brokers: localhost:9092
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: "http://localhost:8081"
          consumer-properties:
            key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            schema.registry.url: "http://localhost:8081"
            specific.avro.reader: true
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
      bindings:
        event-consumer:
          destination: data_stream_in # incoming topic
          contentType: application/**avro
          group: data_stream_consumer
        event-producer:
          destination: data_stream_out
          contentType: application/**avro

I am using the following versions:

  • Spring Boot 2.3.2.RELEASE
  • Spring Cloud: Hoxton.SR8
  • spring-cloud-stream-binder-kafka 3.0.8.RELEASE
  • spring-kafka 2.5.12

Any help is appreciated!

1

1 Answers

1
votes

The second argument in the handle method is the ConsumerRecord which is the original Kafka record, but if you want the record to be automatically sent to a DLQ you can do the following.

@Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
        return (container, dest, group) -> {
            container.setErrorHandler(errorHandler);
        };
    }



    @Bean
    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
    }


    @Bean
    public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
        return new DeadLetterPublishingRecoverer(bytesTemplate);
    }

Essentially, you are setting up a SeekToCurrentErrorHandler which is capable of sending the failed record to the DLQ. See the ref docs for Spring for Apache Kafka for more details on how DeadLetterPublishingRecoverer works: https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters ; More info on SeekToCurrentErrorHandler:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current

you also need to configure and ErrorHandlingDeserializer,

spring.cloud.stream.kafka.binder.configuration.value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.binder.configuration.spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
...
similar for the value class. 

More info on ErrorHandlingDeserializer: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

If you want to modify the record and add a custom message to DLQ, you can do that by overrding the handle method and then gain access to the ConsumerRecord and then call the super class method.