2
votes

I have a kafka topic which has messages without avro schema. We recently want to push messages to this topic with avro schema.

Now the topic has both messages with/without schema.

I have a consumer which consumes from this topic.

-> If I set value.deserializer in consumer config as "KafkaAvroDeserializer.class", I don't see any messages being consumed.

-> If I set value.deserializer in consumer config as "StringDeserializer.class", I am able to consume the messages but, messages which had avro schema look encrypted now.

eg : ConsumerRecord(topic = sample-events, partition = 2, offset = 1089595, CreateTime = 1544116093932, checksum = 2421249481, serialized key size = -1, serialized value size = 159, key = null, value = ���test_impressLbhpb_extranet_opportunity_cleaning_fecron�����YH00756f54-ba55-11e7-8df0-fdb86cefa6ed$abcde).

I have generated java classes for the avro schema and I would like to cast the messages with/without schema coming from consumer to this generated avro java class. I am able to map messages without schema to avro java class with objectMapper.

But for the messages with avro schema coming from consumer, which looked like encrypted as mentioned in the example, I am trying below code snippet:

SpecificDatumReader<SampleEvents> reader = new SpecificDatumReader<SampleEvents>(SampleEvents.getClassSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(record), null);
SampleEvents event = reader.read(null, decoder);

but this doesn't work. I am getting "error :

java.lang.ArrayIndexOutOfBoundsException: -1".

How can I deserialise this message?

1

1 Answers

0
votes

If I set value.deserializer in consumer config as "KafkaAvroDeserializer.class", I don't see any messages being consumed.

Hmm, you should at least be getting either an HTTP or deserializer error...


First, you should be using BytesDeserializer, or a variant of it

Then, you need to become familiar with the methods of ByteBuffer and getting a byte[] into one....

If you have Schema Registry encoded Avro messages, then those have a well-defined wire format

So, you could have something like the following, but in the end, it requires some inference about what data could possibly be in the topic.

// consumerConfig.put("value.deserializer", ByteBufferDeserializer.class)

ByteBuffer buf = record.value();
Deserializer d;

if (buf == null) {
    System.err.println("Tombstoned record");
} else if (buf.get() == 0x0) { // Check for Avro
    int schemaId = buf.getInt();  // If you wanted it

    d = new KafkaAvroDeserializer();        
    Map<String, String> config = new HashMap<>();
    config.put("schema.registry.url", "http://..."); // address to registry
    boolean isKey = false;
    d.configure(config, isKey);

    AvroValue v = d.deserialize(value);
    // TODO: Handle record
} else {
    try {
        d = new StringDeserializer();
        String s = d.deserialize(value);
        // TODO: Handle record
    } catch (Exception e) {
        e.printStackTrace();
    }

}

Take away: do not produce Avro and non-Avro data types into a topic. Otherwise, you are required to only consume as bytes and handle custom logic yourself.