6
votes

I have an event sourced application built on top of Kafka. Currently I have one Topic that has multiple message types in it. All serialized/deserialized with JSON.

The schema registry from confluent looks like a good approach to message types maintenance and with Avro full compatibility mode it also provides a mechanism to message versioning in my event sourced app.

With recent patch -- blog post to 4.1.1 confluent. You can have multiple different types of messages in one topic with Avro serializer/deserializer.

However, I haven't seen any working example of this. Not even a single one.

My question is: does the above patch really work without having to use Avro Union Types (putting all different types of messages in one single schema and utilize union)?

And how would this approach work with a Kafka Streaming app, where you need to specify a Key and Value Serde?

Should I just forget about Avro and just go with protobuff instead?

1
After more research and running POC. I went with protobuff3. I managed to create one protobuff message schema and that message had one property of type "Any". The Any type was my payload and through this I managed to create a generic topic, generic serializer and deserializer that I can leverage in my code to reconstruct the different types of messages that were serialized in the "Any" param.J S

1 Answers

5
votes

This is an example of consumer that gets data from the topic where events of different types are published:

package com.kafka.schema;

import com.phonebook.Employee;
import com.phonebook.Milestone;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;

public class AvroConsumer {

    private static Consumer<Long, GenericRecord> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Const.BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        // Use Kafka Avro Deserializer.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Use Specific Record or else you get Avro GenericRecord.
        // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

        // Schema registry location.
        // Run Schema Registry on 8081
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Const.SCHEMA_REGISTRY);
        props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
        return new KafkaConsumer<>(props);
    }

    public static void main(String... args) {
        final Consumer<Long, GenericRecord> consumer = createConsumer();
        consumer.subscribe(Collections.singletonList(Const.TOPIC));
        IntStream.range(1, 100).forEach(index -> {
            final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
            if (records.count() == 0) {
                System.out.println("None found");
            } else {
                records.forEach(record -> {
                    GenericRecord recValue = record.value();
                    System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), recValue);
                });
            }
        });
    }
}

The important part here is this:

props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());