0
votes

my consumer configuration has kafka batch listener configs and @KafkaListener consume list of messages. I have a ConsumerInterceptor and I want to set unique id for each record and i store its value in Mapped Diagnostic Context (MDC). If my kafka listener consume a single message, the unique id is correct. But my kafka listener consume list of messages so MDC.get("id") gets only last value. How can handle it? My interceptor;

public class KafkaConsumerInterceptor implements ConsumerInterceptor<String, String>{

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        ConsumerRecord<String, String> record = consumerRecords.iterator().next();
        setId(record.headers().headers("id"));
        return consumerRecords;
    }

    private void setId(Iterable<Header> idHeader) {
        String id = UUID.randomUUID().toString();
        if (idHeader.iterator().hasNext()) {
            Header header = idHeader.iterator().next();
            id = new String(header.value(), StandardCharsets.UTF_8);
        }
        MDC.put("id", id);
    }
  }
1

1 Answers

0
votes

You need to move the MDC.put() to wherever you are processing each record, you can't set it for the whole batch.