So I am using spring-boot2.1.6 and integrating kafka consumer to consume any type of message being published on the topic. For reference I am following https://docs.spring.io/spring-boot/docs/2.1.6.RELEASE/reference/htmlsingle/
So I have dependency in my pom:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
I am configuring in application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: foo
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
value:
default:
type: java.lang.Object
And at last here is my listener code:
@KafkaListener(topics = "videoEnrichedEvents")
public void consume(@Payload VideoEnrichedEventsvideoEnrichedEvents){
LOGGER.debug("Consumed message :"+videoEnrichedEvents);
System.out.println("Consumed Message :"videoEnrichedEvents);
}
Since I have different topic and different consumers for it, I want the consumer configurations to be generic enough so that I can read any object and then delegate it to the processing handler. In the error logs I could see:
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.calamp.connect.vs.model.VideoEnrichedEvents] for GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}], failedMessage=GenericMessage [payload={anyotherjson={groups=null, id=0, driverName=from Kusum's console, deviceIdType=null, assetId=null, operatorId=null, avlEventTime=null, videoLink=null, tripId=null, avlEventUuid=null, deviceId=null, appMessageUuid=null, parentAccountList=null, appmsgEventTime=null, enrichedMessage=null, accountId=null}}, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@18213932, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=videoEnrichedEvents, kafka_receivedTimestamp=1590218109430}]
After little googling I found out that ConsumerRecord was used instead of LinkedHashMap everywhere.
And my new code looks like:
@KafkaListener(topics = "videoEnrichedEvents")
public void consume(@Payload ConsumerRecord consumerRecord){
LOGGER.debug("Consumed message!!!Full :"+consumerRecord);
System.out.println("Consumed Message!!! Actual object :"+((LinkedHashMap)consumerRecord.value()));
}
It technically handles any object sent to me. So it solves my purpose. But my question is why ConsumerRecord and not LinkedHashMap? any specific reason?