0
votes

I'm unable to produce data for a specific schema and I'm unable to understand why. The Example data inlcuded as dictionary in the code was created directly from using the confluent "avro-random-generator", so the example data must be correct, since it's directly derived from the schema. Both, Schema Registry, and Avro Random Generator are Confluent Tools, so it can't be that there their tools produces example data that does not work with the their schema registry.

This is the Schema:

{
  "type": "record",
  "name": "schemaV1",
  "namespace": "net.avro.schemaV1",
  "doc": "",
  "fields": [
    {
      "name": "orderId",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": ""
    },
    {
      "name": "offerId",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": ""
    },
    {
      "name": "redeemerId",
      "type": [
        "null",
        {
          "type": "string",
          "avro.java.string": "String"
        }
      ],
      "doc": "",
      "default": null
    },
    {
      "name": "eventCancellationType",
      "type": "int",
      "doc": ""
    },
    {
      "name": "ruleIds",
      "type": {
        "type": "array",
        "items": {
          "type": "string",
          "avro.java.string": "String"
        },
        "doc": ""
      }
    },
    {
      "name": "eventOriginator",
      "type": {
        "type": "record",
        "name": "AvroEventPartnerV1",
        "doc": "",
        "fields": [
          {
            "name": "partnerShortName",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            },
            "doc": ""
          },
          {
            "name": "businessUnitShortName",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ],
            "doc": "",
            "default": null
          },
          {
            "name": "branchShortName",
            "type": [
              "null",
              {
                "type": "string",
                "avro.java.string": "String"
              }
            ],
            "doc": "",
            "default": null
          }
        ]
      }
    },
    {
      "name": "roundedDelta",
      "doc": "",
      "type": {
        "type": "record",
        "name": "AvroAmountV1",
        "doc": "Amount with a currency",
        "fields": [
          {
            "name": "amount",
            "type": {
              "type": "bytes",
              "logicalType": "decimal",
              "precision": 21,
              "scale": 3
            },
            "doc": "The amount as a decimal number"
          },
          {
            "name": "currency",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            },
            "doc": ""
          }
        ]
      }
    },
    {
      "name": "rewardableLegalDelta",
      "type": [
        "null",
        "AvroAmountV1"
      ],
      "doc": "",
      "default": null
    },
    {
      "name": "receiptNumber",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      },
      "doc": ""
    },
    {
      "name": "referenceReceiptNumber",
      "type": [
        "null",
        {
          "type": "string",
          "avro.java.string": "String"
        }
      ],
      "doc": "",
      "default": null
    },
    {
      "name": "eventEffectiveTime",
      "type": {
        "type": "long"
      },
      "doc": ""
    }
  ]
}

This is my script:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer, ClientError, ValueSerializerError

    BOOTSTRAP_SERVER = 'localhost:9092'
    SCHEMA_REGISTRY = 'http://localhost:8081'
    TOPIC = 'topicV1'
    SCHEMA_PATH = 'schemas/schemaV1.avsc'


    def schemaReader(SCHEMA_PATH):
        with open(SCHEMA_PATH, 'r') as file:
            data = file.read()

        return data


    def main():
        kafka_config = {
            'bootstrap.servers': BOOTSTRAP_SERVER,
            'schema.registry.url': SCHEMA_REGISTRY
        }

        value_schema = avro.loads( schemaReader(SCHEMA_PATH) )


        null = None

        value = {
      "orderId": "a9bcc55f-e2c0-43d6-b793-ff5f295d051d",
      "offerId": "119475017578242889",
      "redeemerId": "1176a01b-b2dc-45a9-91cc-232361e14f99",
      "eventCancellationType": 0,
      "ruleIds": ["ID-IPM00001"],
      "eventOriginator": {"partnerShortName": 
      "partner","businessUnitShortName": null,"branchShortName": null},
      "roundedDelta": {"amount": "\u8463","currency": "PTS"},
      "rewardableLegalDelta": {"amount": "\u8463","currency": "EUR"},
      "receiptNumber": "19b2ff68-ed06-48f0-9ce9-d697c0eadc19",
      "referenceReceiptNumber": null,
      "eventEffectiveTime": 1569494696656
     }


        avroProducer = AvroProducer(kafka_config, default_value_schema=value_schema )
        avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
        avroProducer.flush()

    if __name__== "__main__":
        main()

This is the traceback I'm receiving:

  File "producer.py", line 64, in <module>
    main()
  File "producer.py", line 60, in main
    avroProducer.produce(topic=TOPIC, value=value, value_schema=value_schema)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/__init__.py", line 80, in produce
    value = self._serializer.encode_record_with_schema(topic, value_schema, value)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 115, in encode_record_with_schema
    return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 149, in encode_record_with_schema_id
    writer(record, outf)
  File "/apps/python/python2.7/lib/python2.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 86, in <lambda>
    return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp))
  File "/apps/python/python2.7/lib/python2.7/site-packages/avro/io.py", line 1042, in write
    raise AvroTypeException(self.writers_schema, datum)

avro.io.AvroTypeException: The datum {'..'} is not an example of the schema { ..}
1

1 Answers

0
votes

It seems that the problem is that the amount should be a bytes type but you have a normal string of \u8463. The library you mentioned that you used to generate the random data creates a byte string by using the java default charset: https://github.com/confluentinc/avro-random-generator/blob/master/src/main/java/io/confluent/avro/random/generator/Generator.java#L373

However, perhaps that default isn't iso-8859-1 which is what the java implementation (the reference implementation) uses: https://github.com/apache/avro/blob/bf47ec97e0b7f5701042fac067b73b421a9177b7/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java#L220