
I have an application using Spring Cloud Stream and Spring Kafka, which processes Avro messages. The application works fine, but now I'd like to add some error handling.

The Goal: I would like to catch deserialization exceptions, build a new object with the exception details + original Kafka message + custom context info, and push this object to a dedicated Kafka topic. Basically a DLQ, but the original message will be intercepted and decorated.

The Problem: While I can intercept the exception, I can't figure out how to acquire the original message from Kafka (TODO 1, below). I've been all through the data object returned in ConsumerAwareErrorHandler.handle and I don't see it there.

Below is the code I have:

public class SpringcloudApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringcloudApplication.class, args);

    /* Configure custom exception handler */
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
        return (container, destination, group) -> {
            container.setErrorHandler(new ConsumerAwareErrorHandler() {
                public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
                    log.info("Got error with data: {}", data);
                    // TODO 1 - How to get original message?
                    // TODO 2 - Send to dedicated (DLQ) topic

    public void consumeEvent(@Payload Message message) {
        log.info("Consuming event --> {}", message.toString());

    @Autowired private EventStream eventStream;
    public Boolean produceEvent(Message message) {
        log.info("Producing event --> {}", message.toString());
        return eventStream
                    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)


And the properties files:

      default-binder: kafka
          useNativeEncoding: true
          useNativeEncoding: true
          brokers: localhost:9092
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: "http://localhost:8081"
            key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            schema.registry.url: "http://localhost:8081"
            specific.avro.reader: true
            spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
            spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
          destination: data_stream_in # incoming topic
          contentType: application/**avro
          group: data_stream_consumer
          destination: data_stream_out
          contentType: application/**avro

I am using the following versions:

  • Spring Boot 2.3.2.RELEASE
  • Spring Cloud: Hoxton.SR8
  • spring-cloud-stream-binder-kafka 3.0.8.RELEASE
  • spring-kafka 2.5.12

Any help is appreciated!


The second argument in the handle method is the ConsumerRecord which is the original Kafka record, but if you want the record to be automatically sent to a DLQ you can do the following.

    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
        return (container, dest, group) -> {

    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);

    public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
        return new DeadLetterPublishingRecoverer(bytesTemplate);

Essentially, you are setting up a SeekToCurrentErrorHandler which is capable of sending the failed record to the DLQ. See the ref docs for Spring for Apache Kafka for more details on how DeadLetterPublishingRecoverer works: https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters ; More info on SeekToCurrentErrorHandler:https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current

you also need to configure and ErrorHandlingDeserializer,

spring.cloud.stream.kafka.binder.configuration.value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.cloud.stream.kafka.binder.configuration.spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
similar for the value class. 

More info on ErrorHandlingDeserializer: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

If you want to modify the record and add a custom message to DLQ, you can do that by overrding the handle method and then gain access to the ConsumerRecord and then call the super class method.