I have an application using Spring Cloud Stream and Spring Kafka, which processes Avro messages. The application works fine, but now I'd like to add some error handling.
The Goal: I would like to catch deserialization exceptions, build a new object with the exception details + original Kafka message + custom context info, and push this object to a dedicated Kafka topic. Basically a DLQ, but the original message will be intercepted and decorated.
The Problem: While I can intercept the exception, I can't figure out how to acquire the original message from Kafka (TODO 1, below). I've been all through the data object returned in ConsumerAwareErrorHandler.handle and I don't see it there.
Below is the code I have:
@EnableBinding(EventStream.class)
@SpringBootApplication
@Slf4j
public class SpringcloudApplication {
public static void main(String[] args) {
SpringApplication.run(SpringcloudApplication.class, args);
}
/* Configure custom exception handler */
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> cust() {
return (container, destination, group) -> {
container.setErrorHandler(new ConsumerAwareErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
log.info("Got error with data: {}", data);
// TODO 1 - How to get original message?
// TODO 2 - Send to dedicated (DLQ) topic
}
});
};
}
@StreamListener(EventStream.INBOUND)
public void consumeEvent(@Payload Message message) {
log.info("Consuming event --> {}", message.toString());
produceEvent(message);
}
@Autowired private EventStream eventStream;
public Boolean produceEvent(Message message) {
log.info("Producing event --> {}", message.toString());
return eventStream
.producer()
.send(MessageBuilder.withPayload(message)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}
And the properties files:
spring:
cloud:
stream:
default-binder: kafka
default:
consumer:
useNativeEncoding: true
producer:
useNativeEncoding: true
kafka:
binder:
brokers: localhost:9092
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: "http://localhost:8081"
consumer-properties:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
schema.registry.url: "http://localhost:8081"
specific.avro.reader: true
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
bindings:
event-consumer:
destination: data_stream_in # incoming topic
contentType: application/**avro
group: data_stream_consumer
event-producer:
destination: data_stream_out
contentType: application/**avro
I am using the following versions:
- Spring Boot 2.3.2.RELEASE
- Spring Cloud: Hoxton.SR8
- spring-cloud-stream-binder-kafka 3.0.8.RELEASE
- spring-kafka 2.5.12
Any help is appreciated!