my consumer configuration has kafka batch listener configs and @KafkaListener consume list of messages. I have a ConsumerInterceptor and I want to set unique id for each record and i store its value in Mapped Diagnostic Context (MDC). If my kafka listener consume a single message, the unique id is correct. But my kafka listener consume list of messages so MDC.get("id") gets only last value. How can handle it? My interceptor;
public class KafkaConsumerInterceptor implements ConsumerInterceptor<String, String>{
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
ConsumerRecord<String, String> record = consumerRecords.iterator().next();
setId(record.headers().headers("id"));
return consumerRecords;
}
private void setId(Iterable<Header> idHeader) {
String id = UUID.randomUUID().toString();
if (idHeader.iterator().hasNext()) {
Header header = idHeader.iterator().next();
id = new String(header.value(), StandardCharsets.UTF_8);
}
MDC.put("id", id);
}
}