3
votes

How to make Reactive Kafka (https://github.com/akka/reactive-kafka) work with Confluent Schema Registry Avro Schema? Here is my sample code:

def create(groupId: String)(implicit system: ActorSystem): Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = {
    
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer)
      .withBootstrapServers(bootstrap)
      .withGroupId(groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty("schema.registry.url", schemaRegistryUrl)
      .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[String].getName)
      .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName)
      .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
    
    Consumer.committableSource(consumerSettings, Subscriptions.topics(createDataSetJobRequestTopic))
}
1

1 Answers

1
votes

You just need to configure the consumer to use the Confluent deserializer : io.confluent.kafka.serializers.KafkaAvroDeserializer.class

Set the following properties :

  • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
  • ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG

In addition, you have to add the property "schema.registry.url" in order to specify at least one address pointing to your schema registry instance.

Finally, you have to add the following dependecy to your project :

            <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${io.confluent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

Confluent Documentation