0
votes

I am try to understand Avro Serialization on Confluent Kafka along with Schema Registry usage. It was all going well till the end but the final expectations from AVRO made lots of Confusions to me. As per my reading and understanding, Avro Serialization gives us the flexibility that when we have a change in schema, we can simply manage that without impacting the older producer/consumer.

Following the same, I have developed a python producer which will Check for a Schema existence in Schema-Registry, if absent, create it and start Producing the json messages show below. When I need to change schema, I simply update it in my producer and this produces messages with new schema.

My Old Schema :

data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'

Sample Data from Producer-1 :

{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'[email protected]', u'principal': u'[email protected]'}

My New Schema:

data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"new_passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"new_passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'

Sample Data from Producer-2 :

{u'mobile': 9800647004, u'new_passport_make_date': u'2011-05-22', u'lname': u'Reed', u'fname': u'Paul', u'new_passport_expiry_date': u'2021-05-22', u'ipaddress': u'134.124.7.28', u'email': u'[email protected]', u'principal': u'[email protected]'}

Case 1: when I have 2 producers with above 2 schemas running together, I can successfully consume message with below code. All is well till here.

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        xxxxx 
        break
    print msg.value()

Case 2: When I go little deeper in JSON fields, things mixes up and breaks.

At first, say I have one producer running with ‘My Old Schema’ above and one consumer consuming these messages successfully.

print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]

When I run 2nd producer with ‘My New Schema’ mentioned above, my Old Consumers breaks as there is No Field passport_expiry_date and passport_make_date which is True.

Question:

Sometime I think, this is expected as it’s me(Developer) who is using the field names which are Not in the Message. But how Avro can help here? Shouldn't the missing field be handled by Avro? I saw examples in JAVA where this situation was handled properly but did not find any example in Python. For example, below github has perfect example of handling this scenario. When the field is not present, Consumer simply prints 'None'.

https://github.com/LearningJournal/ApacheKafkaTutorials

Case 3: When I run the combinations like Old Producer with Old Consumer and then in another terminals New Producer with New Consumer, Producers/Consumers mixes up and things break saying no json field.

Old Consumer ==>

print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]

New Consumer ==>

print msg.value()["fname"] , msg.value()["lname"] , msg.value()["new_passport_make_date"], msg.value()["new_passport_expiry_date"]

Question:

Again I think, this is expected. But, then Avro makes me think the right Consumer should get the right message with right schema. If I use msg.value() and always parse the fields at consumer side using programming without any role of Avro, then Where is the benefit of using avro? What is the benefit of sending schema with the messages/storing in SR?

Lastly, is there any way to check the schema attached to a message? I understand, in Avro, schema ID is attached with the message which is used further with Schema Registry while Reading and Writing messages. But I never see it with the messages.

Thanks much in Advance.

1

1 Answers

0
votes

It's not clear what compatibility setting you're using on the registry, but I will assume backwards, which means you would have needed to add a field with a default.

Sounds like you're getting a Python KeyError because those keys don't exist.

Instead of msg.value()["non-existing-key"], you can try

option 1: treat it like a dict()

msg.value().get("non-existing-key", "Default value")

option 2: check individually for all the keys that might not be there

some_var = None  # What you want to parse
val = msg.value()
if "non-existing-key" not in val:
    some_var = "Default Value"

Otherwise, you must "project" the newer schema over the older data, which is what the Java code is doing by using a SpecificRecord subclass. That way, the older data would be parsed with the newer schema, which has the newer fields with their defaults.

If you used GenericRecord in Java instead, you would have similar problems. I'm not sure in Python there is an equivalent to Java's SpecificRecord.

By the way, I don't think the string "None" can be applied for a logicalType=timestamp