I need to download the AVRO schema at runtime and I need to pass the bootstrap servers and the kafka topic to resolve the correct schema but I cannot find a way to pass those parameters on the deserializer(except hardcoding them). Do you have any idea on how to do this?
val ops: SerializationOptions = PipelineOptionsFactory.`as`(SerializationOptions::class.java)
ops.setKafkaTopic(pars.kafkaTopic)
ops.setKafkaBootstrapServers(pars.kafkaBootstrapServers)
ops.setKafkaSchemaRegistry(pars.kafkaSchemaRegistry)
val p = Pipeline.create(ops)
p.apply( KafkaIO.read<String, Measurement>()
.withTopic(pars.kafkaTopic)
.withBootstrapServers(pars.kafkaBootstrapServers)
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializer(RemoteAvroDeserializer::class.java)
.withoutMetadata()
)
.apply(Values.create())
(TransformToMeasurementFN()))
.apply(
Window.into<Measurement>(FixedWindows.of(Duration.standardSeconds(10))))
.apply("FilterOrderMeasurement ", ParDo.of<Measurement, String>(RemoveRendersFn()))
.apply(Count.perElement())
.apply("CalculateMeasurementValue", ParDo.of<KV<String, Long>, Long>(CountDuplicateFN()))
p.run()
This is my deserializer:
class RemoteAvroDeserializer : Deserializer<Measurement> {
val decoder: BinaryMessageDecoder<Measurement>
public constructor() {
val schemaStore = RemoteKafkaSchemaRegistry("tst_kafka_topic","tst_bootstrap_servers")
decoder = Measurement.createDecoder(schemaStore)
}
override fun deserialize(s: String, bytes: ByteArray): Measurement {
return decoder.decode(bytes)
}
override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
}
override fun close() {
}
}
RemoteKafkaSchemaRegistry
vs Confluent's existing Java Registry client? – OneCricketeer/subjects/topic-value
– OneCricketeer