Kafka connect events for Debezium connector is Avro encoded.
Mentioned the following in the connect-standalone.properties passed to Kafka connect standalone service.
key.converter=io.confluent.connect.avro.AvroConverter
value.confluent=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
schema.registry.url=http://ip_address:8081
internal.key.converter.schema.registry.url=http://ip_address:8081
internal.value.converter.schema.registry.url=http://ip_address:8081
Configuring the Kafka consumer code with these properties:
Properties props = new Properties();
props.put("bootstrap.servers", "ip_address:9092");
props.put("zookeeper.connect", "ip_address:2181");
props.put("group.id", "test-consumer-group");
props.put("auto.offset.reset","smallest");
//Setting auto comit to false to ensure that on processing failure we retry the read
props.put("auto.commit.offset", "false");
props.put("key.converter.schema.registry.url", "ip_address:8081");
props.put("value.converter.schema.registry.url", "ip_address:8081");
props.put("schema.registry.url", "ip_address:8081");
In the consumer implementation, following is the code to read the key and value components. I am getting the schema for key and value from Schema Registry using REST.
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null));
Parsing the key worked fine. While parsing the value part of the message, I am getting ArrayIndexOutOfBoundsException.
Downloaded the source code for Avro and debugged. Found that the GenericDatumReader.readInt method is returning a negative value. This value is expected to be the index of an array (symbols) and hence should have been positive.
Tried consuming events using the kafka-avro-standalone-consumer but it threw an ArrayIndexOutOfBoundsException too. So, my guess is that the message is improperly encoded at Kafka connect (producer) & the issue is with the configuration.
Following are the questions:
- Is there anything wrong with the configuration passed at producer or consumer?
- Why is key de-serialization working but not that of value?
- Is there anything else needed to be done for things to work? (like specifying character encoding somewhere).
- Can Debezium with Avro be used in production, or is it an experimental feature for now? The post on Debezium Avro specifically says that examples involving Avro will be included in future.
There have been many posts where Avro deserialization threw ArrayIndexOutOfBoundsException but could not relate it to the problem I am facing.