0
votes

I am implementing Exceptional handling for spring kafka project, i have my own DeadLetterPublishingRecoverer which handles the exceptions occurred in the Kafka listener , the whole flow is perfect i.e., when i throw any exception in the logic i can able to log and send it to DLQ topic as per the framework implementation. The problem occurs with the consumer record, if i change any content of in the consumer record value , same consumer record will be published to DLQ topic with changes which is actually wrong in my senairo , i want to log the original message.

...

       @KafkaListener(topics = "${message.topic.name}",containerFactory = "kafkaListenerContainerFactory")  
public void listenGroupFoo(ConsumerRecord<String, MyEvent> consumerRecord) throws InterruptedException, ExecutionException {         

    System.out.println("Received Message" + consumerRecord.value());

    MyEvent consumedEvent=consumerRecord.value();
    consumedEvent.setQuantity(1000);
    
    throw new InvalidProductCodeException();

}

...

The actual message i sent to topic contains only 10 quantities and i am changing some content like changing quantity to 1000 and throwing some exception (like any exception occurrs while processsing) ...

  Received MessageOrderObject CreatedDate=Mon Sep 14 19:38:15 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002

...

After i throw the error my record will be

...

Received MessageOrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002]
Error  Record  OrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=1000, customerId=0002]

and this is my DLQ sub class

...

 @Component
 public class DeadLetterSubclass extends DeadLetterPublishingRecoverer {

@Value(value="${error.topic.name}")
static
String errorTopicName;


BusinessUtils utils=new BusinessUtils();
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> 
 MY_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition("stock-order-error-topic", cr.partition());

public DeadLetterSubclass(KafkaOperations<? extends Object, ? extends Object> template) {
    super(template,MY_DESTINATION_RESOLVER);        
    this.setHeadersFunction((consumerRecord,exception)->{
        System.out.println("Error  Record  "+consumerRecord.value());           
        return consumerRecord.headers();
    });
        
}

...

I want to log the original event object(ConsumerRecord) to be published when the exception occurs .In my case my actual orderquantity is with 10 but order with 1000 quantity is logging , which is not the actual order.

1

1 Answers

0
votes

Since you have a hard reference to the original ConsumerRecord you should not mutate it if you need it unchanged in the error handler.

The framework cannot anticipate that you might mutate the value in your listener. It would be too much overhead to make a copy "just in case".

You should not mutate the original value - clone it instead, and mutate the clone.