0
votes

I have written the Kafka Avro Deserializer using the Avro schema file with generated Java source code. The requirement is not to use the POJO's. How can I make the below code not to use POJOs and generic schema transformation.

    import java.util.Arrays;
    import java.util.Map;
    import org.apache.avro.generic.GenericDatumReader;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.Decoder;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.kafka.common.errors.SerializationException;
    import org.apache.kafka.common.serialization.Deserializer;
    import com.example.org.model.Person;

    public class AvroDeserializer implements Deserializer<GenericRecord> {

     @Override
     public void close() {

     }

     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public GenericRecord deserialize(String topic, byte[] data) {
    try {
      GenericRecord result = null;

      if (data != null) {
        DatumReader<Person> reader = new SpecificDatumReader<> 
     (Person.getSchema());

        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
        result = (GenericRecord) reader.read(null, decoder);
      }
      return result;
     } catch (Exception ex) {
      throw new SerializationException(
      "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }
    }

How do I make this code not to use the POJO's.

2

2 Answers

0
votes

If you are using POJO class in the Serilizer then you are storing schema along with data which will create slowness in parsing the message and take extra space at storage level also. You have to make changes in your Serilizer as well as DeSerilizer.

To fix this problem,Schema Registry is used. Schema Registry

The basic idea of schema registry is that producer/consumer will refer to an avro schema when reading and writing the data to a topic.

We don't want to write the schema for everydata like you imply - often, schema is bigger than your data! That would be a waste of time parsing it everytime when reading, and a waste of ressources (network, disk, cpu)

I will suggest you to go through the following link for code as well as detail description of this topic.

https://blog.cloudera.com/blog/2018/07/robust-message-serialization-in-apache-kafka-using-apache-avro-part-1/

0
votes

There are several options to do that. You could add it to the constructor:

protected final Class<T> targetType;

public AvroDeserializer(Class<T> targetType) {
    this.targetType = targetType;
}

And deserialize using targetType:

        SpecificDatumReader<GenericRecord> datumReader =
            new SpecificDatumReader<>(targetType.newInstance().getSchema());

Then, from the client using the deserializer:

AvroEmbeddedDeserializer<Test> avroEmbeddedDeserializer = new AvroEmbeddedDeserializer<>(Test.class);

final KafkaConsumer<String, Test> consumer = new KafkaConsumer<>(props, stringDeserializer, avroEmbeddedDeserializer);

Note with this approach you can't use deserialization property to configure your consumers because it uses the empty constructor.