How to make Reactive Kafka (https://github.com/akka/reactive-kafka) work with Confluent Schema Registry Avro Schema? Here is my sample code:
def create(groupId: String)(implicit system: ActorSystem): Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = {
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(bootstrap)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty("schema.registry.url", schemaRegistryUrl)
.withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[String].getName)
.withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName)
.withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
Consumer.committableSource(consumerSettings, Subscriptions.topics(createDataSetJobRequestTopic))
}