I have setup a Kafka cluster with a Kafka connect node having a Postgres's sink configuration.
AVRO Schema:
{
"namespace": "example.avro",
"type": "record",
"name": "topicname",
"fields": [
{"name": "deviceid", "type": "string"},
{"name": "longitude", "type": "float"},
{"name": "latitude", "type": "float"}
]
}
My Python Code to publish Data is:
# Path to user.avsc avro schema
SCHEMA_PATH = "user.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())
writer = DatumWriter(SCHEMA)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write({"deviceid":"9098", "latitude": 90.34 , "longitude": 334.4}, encoder)
raw_bytes = bytes_writer.getvalue()
PRODUCER.send_messages(TOPIC, raw_bytes)
I am getting following error in Kafka Connect logs:
org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n","id":0,"worker_id":"0.0.0.0:8083"}],"type":"sink"}
What might be the problem?
Or what should be the proper avro scheme for the mentioned json data ?