0
votes

We are working on integrating Apache Storm with the Confluent framework with Kafka. We are using a python wrapper of storm called "Pyleus"

We set up a Confluent-Kafka JDBC connector monitoring a database table, and whenever there is a change in DB the new record will be sent as a Kafka message in Avro format.

In Pyleus bolt, we are able to get the Kafka message, however, we are not able to de-serialize it into JSON.

We are using two python-Avro modules called "avro_json_serializer" and "avro". They work when I tried to de-serialize simple Avro files that I put together.

The Avro schema for the Avro data in Kafka message is obtained from Confluent's schema registry by using HTTP GET. I put the schema and the Avro data in Kafka message into two files and here is my testing program:

import avro
import avro_json_serializer as ajs

import json

# Avro schema from Confluent's schema registry using HTTP GET
schema_string = open("realAvroSchemaFromKK.avsc").read()

schema_dict = json.loads(schema_string)
avro_schema = avro.schema.make_avsc_object(schema_dict, avro.schema.Names())

serializer = ajs.AvroJsonSerializer(avro_schema)

# Avro data with in Kafka message - I wrote it into this file
avrofile = open("realAvroFromKK.avro", "r")
avro = avrofile.read()

jsonData = serializer.to_json(avro) # where the code error out #

print jsonData

The way I interpret the error message is that my avro schema does not fit with my avro data:

avro.io.AvroTypeException: The datum �bankbankHoward �����THoward �����T� is not an example of the schema {
  "namespace": "example.avro",
  "type": "record",
  "connect.name": "TABLE_NAME",
  "fields": [
    {
      "type": "int",
      "name": "Column_1"
    },
    ... (omitting the rest of the schema)

I read from here that Kafka message in Avro format from Confluent framework has 4 extra bytes at the beginning of the message indicating the schema ID. I tried to strip out the first 4 bytes of the Avro data then send it to the "serializer.to_json()" but still no luck.

HELP!

1

1 Answers

0
votes

I ran into exactly similar issue where I was reading the kafka confluent data through Storm Kafka spout. Here is the equivalent Java code that worked for me.

    ByteBuffer input = ByteBuffer.wrap(data);
    int id = input.getInt();
    int start = input.position() + 1;
    MyAvroObject obj = null;
    try {
        obj  = datum_reader.read(null, DecoderFactory.get().binaryDecoder(input.array(), start, input.limit(), null));

    } catch (IOException e) {
        e.printStackTrace();
    }
    return obj;

getInt() and position method on the ByteBuffer moves the pointer after the schema Id. Hope this helps.