I have been trying to trying to serilalize avro generic record and generate avro serialized data to send to kafka. The major goal is to not use confluent schema registry for storing schema but sending the schema along with the serialized data so it can be extracted from kafka topic and deserialized.
The below is the part of AvroSerializer for generating Avro data.
@Override
public byte[] serialize(String topic, T data) {
try {
byte[] result = null;
if (data != null) {
LOGGER.debug("data='{}'", data);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
datumWriter.setSchema(data.getSchema());
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
result = byteArrayOutputStream.toByteArray();
}
return result;
} catch (IOException ex) {
throw new SerializationException(
"Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
}
}
The serialized data present in kafka looks like this.
The AvroDeserializer part looks like this.
@Override
public T deserialize(String topic, byte[] data) {
GenericRecord person = null;
try {
T result = null;
if (data != null) {
LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
Schema schema = Schema.parse(schemaString);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
LOGGER.debug(result.getSchema().toString());
LOGGER.debug("deserialized data='{}'", result);
}
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
The producer is shown below
public class KafkaAvroProducerUtil {
public Future<RecordMetadata> produceTokafka(GenericRecord object) throws IOException {
Properties properties = new Properties();
// normal producer
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
// avro part
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", AvroSerializer.class.getName());
String topic = "avro";
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<String, GenericRecord>(
topic, object
);
Future<RecordMetadata> data = producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata);
} else {
exception.printStackTrace();
}
}
});
producer.flush();
producer.close();
return data;
}
When I try to deserialize this it says schema is needed. The problem what I understand is that as you see the data in image above(snapshot of consumer running on cmd) the schema is not send along with it. How can I send schema along with the data so that I can deserialize with the schema send along with the data.
data
definition – OneCricketeerdatumWriter.setSchema
– OneCricketeer