1
votes

I have a SchemaRegistry and a KafkaBroker from which I pull data with Avro v1.8.1. For deserialization I've been using Confluent's KafkaAvroDeserializer. Now I've meant to refactor my code in order to use the Elasticsearch API provided by Alpakka, but unfortunately this breaks the deserialization, as it results in NullPointerExceptions:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-0 at offset 0. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2 Caused by: java.lang.NullPointerException at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58) at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1030) at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110) at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1250) at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) at de.adesso.fds.connectors.dpa.news.NewsConsumer.main(MyConsumer.java:58)

I've been using the ConsumerSettings API by Alpakka as described in this example:

val system = ActorSystem.create();

// necessary to convert timestamps correctly in Avro Version 1.8.1 to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());

val consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new KafkaAvroDeserializer())
    .withBootstrapServers(kafkaBootstrapServerUrl)
    .withClientId(InetAddress.getLocalHost().getHostName())
    .withGroupId("" + new Random().nextInt())
    .withProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
    .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withStopTimeout(Duration.ofSeconds(5));

These settings result in the NullPointerExceptions, while this vanilla Kafka Consumer props work fine:

val props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); 
props.put(ConsumerConfig.GROUP_ID_CONFIG, "" + new Random().nextInt());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerUrl);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// necessary to convert timestamps correctly in newer Avro Versions and to avoid ClassCastExceptions
SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
val consumer = new KafkaConsumer<String, MyClass>(props);

In the working example, the values of the ConsumerRecords are successfully deserialized into the classes generated by the AvroMavenPlugin from a schema.

Any hints are appreciated!

1

1 Answers

1
votes

I think you need to pull new KafkaAvroDeserializer() to its own variable, then call the .configure() method on that instance to pass in a non-null Registry URL.

Then pass in the configured instance to ConsumerSettings.create

FWIW, depending on your needs, Kafka Connect works fine to load Elasticsearch