0
votes

I've been trying to write a beam pipeline that reads from a kafka topic, where the topic consists of avro records. The schema for these records can change rapidly, so I want to use the Confluent Schema Registry to fetch the schema and decode the events, before extracting the relevant common fields. Either I'm doing something wrong or the documentation is outdated. I've followed the example here: https://github.com/apache/beam/blob/dfa1e475194ac6f65c42da7b8cb8d5055dd1952c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L176-L198

 * <p>If you want to deserialize the keys and/or values based on a schema available in Confluent
 * Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
 * for deserialization. A {@link Coder} will be inferred automatically based on the respective
 * {@link Deserializer}.
 *
 * <p>For an Avro schema it will return a {@link PCollection} of {@link KafkaRecord}s where key
 * and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. In this case, users
 * don't need to specify key or/and value deserializers and coders since they will be set to {@link
 * KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly.
 *
 * <p>For example, below topic values are serialized with Avro schema stored in Schema Registry,
 * keys are typed as {@link Long}:
 *
 * <pre>{@code
 * PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
 *   .apply(KafkaIO.<Long, GenericRecord>read()
 *      .withBootstrapServers("broker_1:9092,broker_2:9092")
 *      .withTopic("my_topic")
 *      .withKeyDeserializer(LongDeserializer.class)
 *      // Use Confluent Schema Registry, specify schema registry URL and value subject
 *      .withValueDeserializer(
 *          ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
 *    ...

My code looks like this:

    p.apply("ReadFromKafka",
        KafkaIO.<Long, GenericRecord>read()
        .withBootstrapServers(options.getBootstrapServers())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(
                "http://public-kafka-registry.mydomain.com:8081",
                "my_topic-value"))
        .withNumSplits(1)
        .withoutMetadata()
    );

However I'm getting the following error:

incompatible types: no instance(s) of type variable(s) T exist so that org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider<T> conforms to java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<org.apache.avro.generic.GenericRecord>>

screenshot of error message

any help appreciated, as I'm not a java wiz.

1

1 Answers

0
votes

Try this, it works:

KafkaIO.<String, GenericRecord>read()
.withBootstrapServers(options.getBootstrap()) 
.withTopic(options.getInputTopic()) 
.withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", "true"))
.withKeyDeserializer(StringDeserializer.class) 
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider 
.of(options.getSchemaRegistryURL(), "My_TOPIC-valie")));