3
votes

I am using kafka-python 2.0.1 for consuming avro data. Following is the code I have tried:

from kafka import KafkaConsumer
import avro.schema
from avro.io import DatumReader, BinaryDecoder
import io

schema_path="schema.avsc"
schema = avro.schema.parse(open(schema_path).read())
reader = DatumReader(schema)


consumer = KafkaConsumer(
        bootstrap_servers='xxx.xxx.xxx.xxx:9093',
        security_protocol='SASL_SSL',
        sasl_mechanism = 'GSSAPI',
        auto_offset_reset = 'latest',
        ssl_check_hostname=False,
        api_version=(1,0,0))

consumer.subscribe(['test'])

for message in consumer:
       message_val = message.value
       print(message_val)
       bytes_reader = io.BytesIO(message_val)
       bytes_reader.seek(5)    
       decoder = avro.io.BinaryDecoder(bytes_reader)    
       record = reader.read(decoder)
       print(record)

I am getting following error:

avro.io.SchemaResolutionException: Can't access branch index 55 for union with 2 branches Writer's Schema: [ "null", "int" ] Reader's Schema: [ "null", "int" ]

Can anyone please suggest what can be the possible cause of this error? I already followed this thread to skip initial 5 bytes:

How to decode/deserialize Avro with Python from Kafka

1
Anyone who can help on this issue? I am not getting what this error actually means. avro.io.SchemaResolutionException: Can't access branch index 55 for union with 2 branches ?? - sgmbd

1 Answers

0
votes

I got it working. Issue was with the wrong schema being referred. Thanks.