0
votes

I have a AVRO schema registered in a kafka topic and am trying to send data to it. The schema has nested records and I'm not sure how I correctly send data to it using confluent_kafka python.

Example schema: *ingore any typos in schema (real one is very large, just an example)

 {
 "namespace": "company__name",
 "name": "our_data",
 "type": "record",
 "fields": [
           {
            "name": "datatype1",
            "type": ["null", {
                 "type": "record",
                 "name": "datatype1_1",
                 "fields": [ 
                     {"name": "site", "type": "string"},
                     {"name": "units", "type": "string"}
                  ]
             }]
             "default": null
            }
            {
            "name": "datatype2",
            "type": ["null", {
                 "type": "record",
                 "name": "datatype2_1",
                 "fields": [ 
                     {"name": "site", "type": "string"},
                     {"name": "units", "type": "string"}
                  ]
             }]
             "default": null
            }
           ]
          }

I am trying to send data to this schema using confluent_kafka python version. When I have done this prior, the records were not nested and I would use a typical dictionary key: value pairs and serialize it. How can I send nested data to work with schema.

What I tried so far...

message = {'datatype1': 
            {'site': 'sitename',
             'units': 'm'
            }
           }

this version does not cause any kafka errors, but the all of the columns show up as null

and...

message = {'datatype1': 
            {'datatype1_1':
              {'site': 'sitename',
               'units': 'm'
              }
            }
           }

This version produced a kafka error with the schema.

1
You are missing quotes on datatype1 and datatype1_1 in your Python dict, so that is not valid code, but what was the error from the registry or Kafka?OneCricketeer
Sorry, that was just an typo when writing the question. Just fixed in the question. KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="{'datatype1_1':{'site': 'sitename', 'units': 'm'}" (type <class 'dict'>) do not match (the schema)rayyyyychul
Is this JSON or binary?Marius Soutier
And have you tried if it works when datatype1_1 is not nullable?Marius Soutier

1 Answers

0
votes

If you use namespaces, you don't have to worry about naming collisions and you can properly structure your optional records: for example, both

{
  "meta": {
    "instanceID": "something"
  }
}

And

{}

are valid instances of:

{
  "doc": "Survey",
  "name": "Survey",
  "type": "record",
  "fields": [
    {
      "name": "meta",
      "type": [
        "null",
        {
          "name": "meta",
          "type": "record",
          "fields": [
            {
              "name": "instanceID",
              "type": [
                "null",
                "string"
              ],
              "namespace": "Survey.meta"
            }
          ],
          "namespace": "Survey"
        }
      ],
      "namespace": "Survey"
    }
  ]
}