1
votes

I am trying to read the last message available in a kafka topic using ConsumerSeekAware.Message type is List of Avro objects. I am able to do that successfully. But when during deserialization it is failing. The message has been produced using spring-cloud-stream-kafka framework. Message has a contentType contentType=application/x-java-object;type=java.util.ArrayList.

I know the avro message can be deserialized like below.

 DatumReader<GenericRecord> datumReader =
            new SpecificDatumReader<>(targetType.newInstance().getSchema());
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);

But it doesn't work. It may be due to two things.

  1. The message is a List of avro objects. But I am trying to create the datamReader with Avro schema. But i tried creating schema like Schema.createArray(UserDTO.class). But it doesnt work though.

  2. I think the content type expected for avro message are application/avro but when the message is produced by s-c-s it is contentType=application/x-java-object;

I am trying to create a deserializer by implementingorg.apache.kafka.common.serialization.Deserializer and construct the KafkaConsumerFactory. Can someone help?

1

1 Answers

1
votes

See property ...producer.useNativeEncoding as shown in the Producer Properties.

useNativeEncoding

When set to true, the outbound message is serialized directly by client library, which must be configured correspondingly (e.g. setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use appropriate decoder (ex: Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding/decoding is used the headerMode property is ignored and headers will not be embedded into the message.

Default: false.