I'm using spring kafka (KafkaTemplate) to send a string messages. But to make it compatible with old code, I need to attach extra CorrelationId in the message. so I create ProducerRecord object with my message as its value, and setting CorrelationId in its HeaderRecord.
producerRecord = new ProducerRecord<>(
kafkaTemplate_.getDefaultTopic(),
null,
null,
null,
myStringMessage,
Collections.singletonList(new RecordHeader("CorrelationID", someIdAsBytes)));
kafkaTemplate_.sendDefault(producerRecord);
The key and value serializer is set as StringSerializer, but the about code failes saying something like ProduderRecord is not of type String or StringSerializer.
If I do toString() like below, it works. But then at the MessageListener side, the received ConsumerRecord does not have the correlationId as its RecordHeader because correlationId is attached as RecordHeader for ProducerRecord. So I have to do type casting on MessageListener onMessage(Object msg)
like cast from Object to ConsumerRecord (which is a string serialized version of ProduderRecord), parse ProducerRecord from ConsumerRecord.value() field, then get the CorrelationId from ProcuderRecord header, and myStringMessage from ProducerRecord. This looks cumbersome. Is my logic of sending and receiving ok?
kafkaTemplate_.sendDefault(producerRecord.toString());