I'm unable to produce data for a specific schema and I'm unable to understand why. The Example data inlcuded as dictionary in the code was created directly from using the confluent "avro-random-generator", so the example data must be correct, since it's directly derived from the schema. Both, Schema Registry, and Avro Random Generator are Confluent Tools, so it can't be that there their tools produces example data that does not work with the their schema registry.
This is the Schema:
{
"type": "record",
"name": "schemaV1",
"namespace": "net.avro.schemaV1",
"doc": "",
"fields": [
{
"name": "orderId",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "offerId",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "redeemerId",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
},
{
"name": "eventCancellationType",
"type": "int",
"doc": ""
},
{
"name": "ruleIds",
"type": {
"type": "array",
"items": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
}
},
{
"name": "eventOriginator",
"type": {
"type": "record",
"name": "AvroEventPartnerV1",
"doc": "",
"fields": [
{
"name": "partnerShortName",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "businessUnitShortName",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
},
{
"name": "branchShortName",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
}
]
}
},
{
"name": "roundedDelta",
"doc": "",
"type": {
"type": "record",
"name": "AvroAmountV1",
"doc": "Amount with a currency",
"fields": [
{
"name": "amount",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 21,
"scale": 3
},
"doc": "The amount as a decimal number"
},
{
"name": "currency",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
}
]
}
},
{
"name": "rewardableLegalDelta",
"type": [
"null",
"AvroAmountV1"
],
"doc": "",
"default": null
},
{
"name": "receiptNumber",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": ""
},
{
"name": "referenceReceiptNumber",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "",
"default": null
},
{
"name": "eventEffectiveTime",
"type": {
"type": "long"
},
"doc": ""
}
]
}
This is my script:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer, ClientError, ValueSerializerError
BOOTSTRAP_SERVER = 'localhost:9092'
SCHEMA_REGISTRY = 'http://localhost:8081'
TOPIC = 'topicV1'
SCHEMA_PATH = 'schemas/schemaV1.avsc'
def schemaReader(SCHEMA_PATH):
with open(SCHEMA_PATH, 'r') as file:
data = file.read()
return data
def main():
kafka_config = {
'bootstrap.servers': BOOTSTRAP_SERVER,
'schema.registry.url': SCHEMA_REGISTRY
}
value_schema = avro.loads( schemaReader(SCHEMA_PATH) )
null = None
value = {
"orderId": "a9bcc55f-e2c0-43d6-b793-ff5f295d051d",
"offerId": "119475017578242889",
"redeemerId": "1176a01b-b2dc-45a9-91cc-232361e14f99",
"eventCancellationType": 0,
"ruleIds": ["ID-IPM00001"],
"eventOriginator": {"partnerShortName":
"partner","businessUnitShortName": null,"branchShortName": null},
"roundedDelta": {"amount": "\u8463","currency": "PTS"},
"rewardableLegalDelta": {"amount": "\u8463","currency": "EUR"},
"receiptNumber": "19b2ff68-ed06-48f0-9ce9-d697c0eadc19",
"referenceReceiptNumber": null,
"eventEffectiveTime": 1569494696656
}
avroProducer = AvroProducer(kafka_config, default_value_schema=value_schema )
avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
avroProducer.flush()
if __name__== "__main__":
main()
This is the traceback I'm receiving:
File "producer.py", line 64, in <module>
main()
File "producer.py", line 60, in main
avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 115, in encode_record_with_schema
return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 149, in encode_record_with_schema_id
writer(record, outf)
File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 86, in <lambda>
return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
File "/apps/python/python2.7/lib/python2.7/site-packages/avro/io.py", line 1042, in write
raise AvroTypeException(self.writers_schema, datum)
avro.io.AvroTypeException: The datum {'..'} is not an example of the schema { ..}