0
votes

Kafka Avro serializer and deserializer is not working. I tried consuming the messages using the kafka console consumer and i could see the messages published.

public class AvroProducer<T> {

    private static Properties props;
    static {
        props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer");
    }

    private static KafkaProducer producer = new KafkaProducer<>(props);

    public byte[] createRecords(T pojo) throws IOException{
        Schema.Parser parser = new Schema.Parser();
        Schema schema = null;
        try {
            schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc"));
        } catch (IOException e) {
            System.out.println(e.getLocalizedMessage());
        }

        final GenericData.Record record = new GenericData.Record(schema);
        schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(pojo).getPropertyValue(r.name())));
        SpecificDatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
        try(ByteArrayOutputStream os = new ByteArrayOutputStream()){
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);

            writer.write(record, encoder);
            encoder.flush();
            byte[] bytemessage = os.toByteArray();
            return bytemessage;
        }

    }

    public static void sendMessage(byte[] bytemessage){
        ProducerRecord precord = new ProducerRecord<StringSerializer, byte[]>("jason", bytemessage);
        producer.send(precord);
    }
}



    public class AvroConsumer {

    private static Properties kafkaProps;

    static {
        kafkaProps = new Properties();
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroConsumer-GroupOne");
    }

    @SuppressWarnings("unchecked")
    public static void recieveRecord() throws IOException{
        try (KafkaConsumer<String,byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
            kafkaConsumer.subscribe(Arrays.asList("jason"));
            while (true) {
                ConsumerRecords<String,byte[]> records = kafkaConsumer.poll(100);
                Schema.Parser parser = new Schema.Parser();
                final Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc"));
                records.forEach(record -> {
                    SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(schema);
                    ByteArrayInputStream is = new ByteArrayInputStream(record.value());
                    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(is, null);
                    try {
                        Syslog log = (Syslog) datumReader.read(null, binaryDecoder);

                        System.out.println("Value: " + log);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }

}

Stack trace is as below.Please find the details. Can someone guide me with the right implementation. The problem seems to be with the casting record.how to access the value. How to read the data using the Specific Datum Reader.

Exception in thread "Thread-1" java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.cisco.daas.kafka.Syslog
at com.cisco.daas.kafka.AvroConsumer.lambda$0(AvroConsumer.java:46)
at java.lang.Iterable.forEach(Unknown Source)
at com.cisco.daas.kafka.AvroConsumer.recieveRecord(AvroConsumer.java:41)
at com.cisco.daas.kafka.MainApp$1.run(MainApp.java:32)
at java.lang.Thread.run(Unknown Source)

This is the schema i am trying to parse

{
    "namespace": "com.example.syslogmessage",
    "type": "record",
    "name": "SysLogMessage",
    "fields": [{
            "name": "partyID",
            "type": "long"
        },
        {
            "name": "applianceID",
            "type": "string"
        },
        {
            "name": "message",
            "type": "string"
        },
        {
            "name": "severity",
            "type": "long"
        },
        {
            "name": "messageType",
            "type": "string"
        },
        {
            "name": "eventtime",
            "type": "long",
            "logicalType": "timestamp-millis"
        },
        {
            "name": "ipaddress",
            "type": "string"
        },
        {
            "name": "hostname",
            "type": "string"
        }
    ]
}
1

1 Answers

0
votes

The issue is resolved by using the key, value of the GenericRecord

public class AvroConsumer<T> {

    private static Properties kafkaProps;

    static {
        kafkaProps = new Properties();
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroConsumer-GroupOne");
    }

    public void recieveRecord() throws IOException {
        try (KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
            kafkaConsumer.subscribe(Arrays.asList("jason"));
            while (true) {
                ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100);
                Schema.Parser parser = new Schema.Parser();
                final Schema schema = parser
                        .parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc"));
                records.forEach(record -> {
                    SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(schema);
                    ByteArrayInputStream is = new ByteArrayInputStream(record.value());
                    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(is, null);
                    try {
                        T log = datumReader.read(null, binaryDecoder);
                        System.out.println("Value: " + log);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }

}