0
votes

I have been trying to trying to serilalize avro generic record and generate avro serialized data to send to kafka. The major goal is to not use confluent schema registry for storing schema but sending the schema along with the serialized data so it can be extracted from kafka topic and deserialized.

The below is the part of AvroSerializer for generating Avro data.


  @Override
  public byte[] serialize(String topic, T data) {
    try {
      byte[] result = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
            EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close(); 
        result = byteArrayOutputStream.toByteArray();


      }

      return result;
    } catch (IOException ex) {
      throw new SerializationException(
          "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

The serialized data present in kafka looks like this.

enter image description here

The AvroDeserializer part looks like this.

  @Override
  public T deserialize(String topic, byte[] data) {

    GenericRecord person = null;

    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));

        Schema schema = Schema.parse(schemaString);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);

 
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);
        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);
      }

      return result;

    } catch (Exception ex) {
      throw new SerializationException(
          "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }


The producer is shown below

public class KafkaAvroProducerUtil {


    public  Future<RecordMetadata> produceTokafka(GenericRecord object) throws IOException {


        Properties properties = new Properties();
        // normal producer
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "10");
        // avro part

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", AvroSerializer.class.getName());
 


        String topic = "avro";

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
        ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<String, GenericRecord>(
                topic, object
        );

        Future<RecordMetadata> data = producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata);
                } else {
                    exception.printStackTrace();
                }
            }
        });

        producer.flush();
        producer.close();


        return data;
    }

When I try to deserialize this it says schema is needed. The problem what I understand is that as you see the data in image above(snapshot of consumer running on cmd) the schema is not send along with it. How can I send schema along with the data so that I can deserialize with the schema send along with the data.

1
Please show a minimal reproducible example that includes your Kafka producer and the data definitionOneCricketeer
Note: the code I linked to before never calls datumWriter.setSchemaOneCricketeer
@OneCricketeer I have edited the question to add more details. Yes the code is previously I had. I have tried the code you suggested for the same but it was producing the same output.pacman
When I try to deserialize this it says schema is needed - 1) What is saying this? 2) Yes, as answered before, Avro requires a "reader schema" , despite having a "writer schema" part of the data (or not)OneCricketeer
Schema Registry is not "inevitable". I'm saying the deserializer will always need to use a schema (but there's no easy way I'm aware of to extract it from the incoming byte array)... You might have better luck with the Jackson Avro libraryOneCricketeer

1 Answers

1
votes

EDITS: I have approached the answers in two ways as per the suggestions of @OneCricketeer and @ChinHuang.

Both the approaches are explained below. But the answer for the header approach is shown below.

APPROACH 1: Sending schema along with data

In this approach I seraialized the Avro schema as string and along with a delimiter and send them to kafka topic adding the data along with it.

While deserializing after reading the data from kafka topic split the byte array as schema and data using the delimiter. Then I would convert schema bytes back to schema and then use that schema to deserialize the data.

Cons of the apporach: As @OneCricketeer said

  1. It is definitely non performant
  2. The whole approach would break if the delimitter comes in the schema

APPROACH 2: Sending schema in the header

Here rather than sending schema along with the data , the schema is send in the header.

the methods in the Serializer class are shown below.


  @Override
  public byte[] serialize(String topic, T data) {


   return null;
 
}

  public  byte[] serialize(String topic, Headers headers, T data) {


    try {

      byte[] result = null;
      byte[] payload = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
                EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);

        byte[] schemaBytes = data.getSchema().toString().getBytes();

        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close();


        result = byteArrayOutputStream.toByteArray();

        ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream( );
        outputStream2.write( result );
        payload =  outputStream2.toByteArray( );

        headers.add("schema",schemaBytes);

      }

      LOGGER.info("headers added");
      return payload;
    } catch (IOException ex) {
      throw new SerializationException(
              "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

the Deserializer methods are shown below.



  @Override
  public T deserialize(String topic, byte[] data) {

      return  null


   }
  public T deserialize(String topic, Headers headers, byte[] data) {


    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
        Header header = headers.lastHeader("schema");

        String schemaString2 = new String(header.value());
  
        Schema schema = Schema.parse(schemaString2);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
        DataFileReader<GenericRecord> dataFileReader = null;

        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);

        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);

      }

      return (T) result;

    } catch (Exception ex) {
      throw new SerializationException(
              "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }