We are working on integrating Apache Storm with the Confluent framework with Kafka. We are using a python wrapper of storm called "Pyleus"
We set up a Confluent-Kafka JDBC connector monitoring a database table, and whenever there is a change in DB the new record will be sent as a Kafka message in Avro format.
In Pyleus bolt, we are able to get the Kafka message, however, we are not able to de-serialize it into JSON.
We are using two python-Avro modules called "avro_json_serializer" and "avro". They work when I tried to de-serialize simple Avro files that I put together.
The Avro schema for the Avro data in Kafka message is obtained from Confluent's schema registry by using HTTP GET. I put the schema and the Avro data in Kafka message into two files and here is my testing program:
import avro
import avro_json_serializer as ajs
import json
# Avro schema from Confluent's schema registry using HTTP GET
schema_string = open("realAvroSchemaFromKK.avsc").read()
schema_dict = json.loads(schema_string)
avro_schema = avro.schema.make_avsc_object(schema_dict, avro.schema.Names())
serializer = ajs.AvroJsonSerializer(avro_schema)
# Avro data with in Kafka message - I wrote it into this file
avrofile = open("realAvroFromKK.avro", "r")
avro = avrofile.read()
jsonData = serializer.to_json(avro) # where the code error out #
print jsonData
The way I interpret the error message is that my avro schema does not fit with my avro data:
avro.io.AvroTypeException: The datum �bankbankHoward �����THoward �����T� is not an example of the schema {
"namespace": "example.avro",
"type": "record",
"connect.name": "TABLE_NAME",
"fields": [
{
"type": "int",
"name": "Column_1"
},
... (omitting the rest of the schema)
I read from here that Kafka message in Avro format from Confluent framework has 4 extra bytes at the beginning of the message indicating the schema ID. I tried to strip out the first 4 bytes of the Avro data then send it to the "serializer.to_json()" but still no luck.
HELP!