1
votes

I'm trying to consume Avro records from Kafka using Nifi. I have 3 topics filled from an Amazon Lambda and 2 Spark Streaming jobs, all of which use HortonWorks Schema Registry to get the Avro schema.

I tried to use a ConsumeKafkaRecord_0_10 and ConsumeKafkaRecord_2_0 and getting the same error: Consume Kafka Record Error

I tried with an AvroReader using plain text schema inside, to be sure of the one being used, and got the same error. When I'm using an AvroReader with the Horton Schema Registry parameter I'm getting this error: Consume kafka horton schema error

Which could make sens because it's looking at the first byte of the record as a version parameter for the schema, and the first byte is 3. But it doesn't explain why I'm getting ArrayIndexOutOfBound when putting the schema in plain text.

Finally I can consume those topic just fine using Spark Streaming and Schema Registry. Doesn't anyone already encounter such an issue between NiFi and AvroReader when consuming Kafka.

Stack: Horton Works HDP 3.4.1 // Nifi 1.9.0 // Spark 2.3 // Schema Registry 0.7

1
Yes, using this processor I still need to deserialize the avro message later and get the same error. I think it has something to do with the Avro Serializer/Deserializer Nifi is using - Vincent

1 Answers

0
votes

The issue is related to how Nifi is interpreting first bytes of your Avro message. Those bytes contain information regaring:

  • Protocol Id - 1 byte
  • Schema Metadata Id - 8 bytes
  • Schema Version - 4 bytes

Going through the code of HortonWork Schema Registry we can find that different Protocol ID can be used to serialize your message with the AvroSerDe.

public static final byte CONFLUENT_VERSION_PROTOCOL = 0x0;
public static final byte METADATA_ID_VERSION_PROTOCOL = 0x1;
public static final byte VERSION_ID_AS_LONG_PROTOCOL = 0x2;
public static final byte VERSION_ID_AS_INT_PROTOCOL = 0x3;
public static final byte CURRENT_PROTOCOL = VERSION_ID_AS_INT_PROTOCOL;

Source

The default one used is VERSION_ID_AS_INT_PROTOCOL which means the first byte of the Avro messages is going to be 03.

When going through Nifi code, we see that it's actually using METADATA_ID_VERSION_PROTOCOL only, expecting a 01 and not taking into account anything else.

You have to force Spark to use METADATA_ID_VERSION_PROTOCOL when creating you SchemaRegistryConfig.

  val config = Map[String, Object](
    "schema.registry.url" -> ConfigManager.config.getProperty("schemaregistry.default.url"),
    AbstractAvroSnapshotSerializer.SERDES_PROTOCOL_VERSION -> SerDesProtocolHandlerRegistry.METADATA_ID_VERSION_PROTOCOL.asInstanceOf[Object]
  )
  implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)