I have a kafka topic which has messages without avro schema. We recently want to push messages to this topic with avro schema.
Now the topic has both messages with/without schema.
I have a consumer which consumes from this topic.
-> If I set value.deserializer in consumer config as "KafkaAvroDeserializer.class", I don't see any messages being consumed.
-> If I set value.deserializer in consumer config as "StringDeserializer.class", I am able to consume the messages but, messages which had avro schema look encrypted now.
eg : ConsumerRecord(topic = sample-events, partition = 2, offset = 1089595, CreateTime = 1544116093932, checksum = 2421249481, serialized key size = -1, serialized value size = 159, key = null, value = ���test_impressLbhpb_extranet_opportunity_cleaning_fecron�����YH00756f54-ba55-11e7-8df0-fdb86cefa6ed$abcde).
I have generated java classes for the avro schema and I would like to cast the messages with/without schema coming from consumer to this generated avro java class. I am able to map messages without schema to avro java class with objectMapper.
But for the messages with avro schema coming from consumer, which looked like encrypted as mentioned in the example, I am trying below code snippet:
SpecificDatumReader<SampleEvents> reader = new SpecificDatumReader<SampleEvents>(SampleEvents.getClassSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(record), null);
SampleEvents event = reader.read(null, decoder);
but this doesn't work. I am getting "error :
java.lang.ArrayIndexOutOfBoundsException: -1".
How can I deserialise this message?