I am implementing Exceptional handling for spring kafka project, i have my own DeadLetterPublishingRecoverer which handles the exceptions occurred in the Kafka listener , the whole flow is perfect i.e., when i throw any exception in the logic i can able to log and send it to DLQ topic as per the framework implementation. The problem occurs with the consumer record, if i change any content of in the consumer record value , same consumer record will be published to DLQ topic with changes which is actually wrong in my senairo , i want to log the original message.
...
@KafkaListener(topics = "${message.topic.name}",containerFactory = "kafkaListenerContainerFactory")
public void listenGroupFoo(ConsumerRecord<String, MyEvent> consumerRecord) throws InterruptedException, ExecutionException {
System.out.println("Received Message" + consumerRecord.value());
MyEvent consumedEvent=consumerRecord.value();
consumedEvent.setQuantity(1000);
throw new InvalidProductCodeException();
}
...
The actual message i sent to topic contains only 10 quantities and i am changing some content like changing quantity to 1000 and throwing some exception (like any exception occurrs while processsing) ...
Received MessageOrderObject CreatedDate=Mon Sep 14 19:38:15 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002
...
After i throw the error my record will be
...
Received MessageOrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002]
Error Record OrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=1000, customerId=0002]
and this is my DLQ sub class
...
@Component
public class DeadLetterSubclass extends DeadLetterPublishingRecoverer {
@Value(value="${error.topic.name}")
static
String errorTopicName;
BusinessUtils utils=new BusinessUtils();
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
MY_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition("stock-order-error-topic", cr.partition());
public DeadLetterSubclass(KafkaOperations<? extends Object, ? extends Object> template) {
super(template,MY_DESTINATION_RESOLVER);
this.setHeadersFunction((consumerRecord,exception)->{
System.out.println("Error Record "+consumerRecord.value());
return consumerRecord.headers();
});
}
...
I want to log the original event object(ConsumerRecord) to be published when the exception occurs .In my case my actual orderquantity is with 10 but order with 1000 quantity is logging , which is not the actual order.