0
votes

I need to download the AVRO schema at runtime and I need to pass the bootstrap servers and the kafka topic to resolve the correct schema but I cannot find a way to pass those parameters on the deserializer(except hardcoding them). Do you have any idea on how to do this?

    val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
    ops.setKafkaTopic(pars.kafkaTopic)
    ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
    ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
    val p = Pipeline.create(ops)

    p.apply( KafkaIO.read<String, Measurement>()
            .withTopic(pars.kafkaTopic)
            .withBootstrapServers(pars.kafkaBootstrapServers)
            .withKeyDeserializer(StringDeserializer::class.java)
            .withValueDeserializer(RemoteAvroDeserializer::class.java)
            .withoutMetadata()
    )
            .apply(Values.create())
            (TransformToMeasurementFN()))
            .apply(
                    Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
            .apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
            .apply(Count.perElement())
            .apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))

    p.run()

This is my deserializer:

class RemoteAvroDeserializer : Deserializer<Measurement> {
 val decoder: BinaryMessageDecoder<Measurement>

 public constructor() {
     val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
     decoder = Measurement.createDecoder(schemaStore)
 }

 override fun deserialize(s: String, bytes: ByteArray): Measurement {
     return decoder.decode(bytes)
 }

 override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
 }

 override fun close() {
 }
}
1
What is RemoteKafkaSchemaRegistry vs Confluent's existing Java Registry client?OneCricketeer
Hi, that it is just our own implementation of Confluent Java Registry Client which fetches the Avro scheme based on topic and registryBrix
Isn't that what it already does? The default schema is always at /subjects/topic-valueOneCricketeer

1 Answers

0
votes

According to the Beam documentation, you can set consumer configuration like so

  KafkaIO... 
.withConsumerConfigUpdates(ImmutableMap.of("group.id", "my_beam_app_1"))

I assume you can just add schema.registry.url, or whatever, here