1
votes

I have setup a Kafka cluster with a Kafka connect node having a Postgres's sink configuration.

AVRO Schema:

{
    "namespace": "example.avro",
    "type": "record",
    "name": "topicname",
    "fields": [
        {"name": "deviceid", "type": "string"},
        {"name": "longitude", "type": "float"},
        {"name": "latitude",  "type": "float"}
    ]
}

My Python Code to publish Data is:

# Path to user.avsc avro schema
SCHEMA_PATH = "user.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

writer = DatumWriter(SCHEMA)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write({"deviceid":"9098", "latitude":  90.34 , "longitude": 334.4}, encoder)
raw_bytes = bytes_writer.getvalue()
PRODUCER.send_messages(TOPIC, raw_bytes)

I am getting following error in Kafka Connect logs:

org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n","id":0,"worker_id":"0.0.0.0:8083"}],"type":"sink"}

What might be the problem?
Or what should be the proper avro scheme for the mentioned json data ?

1

1 Answers

1
votes

I haven't done much with the various python clients, but that magic byte error is almost certainly because what you are sending may be valid avro, but if you want to integrate with the schema registry the payload needs to be in a different format (additional header info, documented here https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html search for wire format or magic byte). I would personally try to use confluent's python kafka client-- https://github.com/confluentinc/confluent-kafka-python -- it has examples of working with Avro and the schema registry.