I am trying to read the last message available in a kafka topic using ConsumerSeekAware.Message type is List of Avro objects. I am able to do that successfully. But when during deserialization it is failing.
The message has been produced using spring-cloud-stream-kafka framework. Message has a contentType contentType=application/x-java-object;type=java.util.ArrayList
.
I know the avro message can be deserialized like below.
DatumReader<GenericRecord> datumReader =
new SpecificDatumReader<>(targetType.newInstance().getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
But it doesn't work. It may be due to two things.
The message is a List of avro objects. But I am trying to create the datamReader with Avro schema. But i tried creating schema like Schema.createArray(UserDTO.class). But it doesnt work though.
I think the content type expected for avro message are application/avro but when the message is produced by s-c-s it is
contentType=application/x-java-object;
I am trying to create a deserializer by implementingorg.apache.kafka.common.serialization.Deserializer
and construct the KafkaConsumerFactory
. Can someone help?