
How to use Spring-Kafka to read AVRO message with Confluent Schema registry? Is there any sample? I can't find it in official reference document.

You have to use KafkaAvroSerializer and KafkaAvroDeserializer. You have to set schema.registry.url to your schema registry URL. You can use Landoop fast-data-dev for your localhost development. github.com/Landoop/fast-data-devkkflf

Below code can read the message from customer-avro topic. Here's the AVRO schema on value i have defined as.

     "type": "record",
     "namespace": "com.example",
     "name": "Customer",
     "version": "1",
     "fields": [
       { "name": "first_name", "type": "string", "doc": "First Name of Customer" },
       { "name": "last_name", "type": "string", "doc": "Last Name of Customer" },
       { "name": "age", "type": "int", "doc": "Age at the time of registration" },
       { "name": "height", "type": "float", "doc": "Height at the time of registration in cm" },
       { "name": "weight", "type": "float", "doc": "Weight at the time of registration in kg" },
       { "name": "automated_email", "type": "boolean", "default": true, "doc": "Field indicating if the user is enrolled in marketing emails" }

Below is a complete code snippet to read this example with manual commit.

import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Calendar;
import java.util.Collections;
import java.util.Properties;

public class KafkaAvroJavaConsumerV1Demo {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // normal consumer
        properties.put("group.id", "customer-consumer-group-v1");
        properties.put("auto.commit.enable", "false");
        properties.put("auto.offset.reset", "earliest");

        // avro part (deserializer)
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
        properties.setProperty("schema.registry.url", "");
        properties.setProperty("specific.avro.reader", "true");

        KafkaConsumer<String, Customer> kafkaConsumer = new KafkaConsumer<>(properties);
        String topic = "customer-avro";

        System.out.println("Waiting for data...");

        while (true){
            System.out.println("Polling at " + Calendar.getInstance().getTime().toString());
            ConsumerRecords<String, Customer> records = kafkaConsumer.poll(1000);

            for (ConsumerRecord<String, Customer> record : records){
                Customer customer = record.value();


This does not vary if use spring kafka or native java client for kafka.Assuming you want to consume messages from a topic and the key and value are avro records you can have the below properties added to consumerproperties.

consumerProperties.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.put("schema.registry.url", KAFKA_SCHEMA_REGISTRY_URL);

If you are consuming a specific record from the topic you need to add the following line as well

consumerProperties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);