3
votes

I recieve binary Avro files from a Kafka topic and I must deserialize them. In the message received by Kafka, I can see a schema at the start of every message. I know it's a better practice to not embed the schema and separate it from the actual Avro file, but I don't have control over the producer and I can't change that.

My code runs on top of Apache Storm. First I create a reader:

mDatumReader = new GenericDatumReader<GenericRecord>();

And later I try to deserialize the message without declaring schema:

Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
GenericRecord payload = mDatumReader.read(null, decoder);

But then I get an error when a message arrives:

Caused by: java.lang.NullPointerException: writer cannot be null!
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:77) ~[stormjar.jar:?]
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:46) ~[stormjar.jar:?]
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:122) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:137) ~[stormjar.jar:?]

All the answers I've seen are about using other formats, changing the messages delivered to Kafka or something else. I don't have control over those things.

My question is, given a message in bytes[] with embedded schema inside binary message, how to deserialize that Avro file without declaring schema so I can read it.

2

2 Answers

2
votes

With the DatumReader/Writer, there is no such thing like an embedded schema. Had been my misunderstanding when looking at Avro & Kafka the first time as well. But the source code of the Avro Serializer clearly shows there is no schema embedded when using the GenericDatumWriter.

It is the DataFileWriter who does write a schema at the beginning of the file and then adds GenericRecords using the GenericDatumWriter.

Since you said there is a schema at the beginning, I assume you can read it, turn it into a Schema object and then pass that into the GenericDatumReader(schema) constructor. Would be interesting to know how the message is serialized. Maybe the DataFileWriter is used to write into a byte[] instead of an actual file, then you could use the DataFileReader to deserialize the data?

0
votes
  1. Add Maven Dependancy

    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.9.1</version>
        <type>maven-plugin</type>
    </dependency>
    
  2. Create a file like below

     {"namespace": "tachyonis.space",
       "type": "record",
       "name": "Avro",
       "fields": [
          {"name": "Id", "type": "string"},
        ]
      }
    
  3. Save above as Avro.avsc in src/main/resources.

  4. In Eclipse or any IDE Run > Maven generate sources which create Avro.java to package folder [namespace] tachyonis.space

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
    KafkaConsumer<String, Avro> consumer = new KafkaConsumer<>(props);
    
  5. The consumer/producer has to run in the same machine. Otherwise you need to configure hosts file in Windows/Linux and change all components configurations properties from localhost to map to the actual IP address for broadcast to the producers/consumers. Otherwise you get errors like network connection issues

    Connection to node -3 (/127.0.0.1:9092) could not be established. Broker may not be available