We have a use-case where we are trying to consumer multiple Kafka topics (AVRO messages) integrating with Schema registry. We are using Spark Structured streaming (Spark version : 2.4.4) , Confluent Kafka (Library version: 5.4.1) for the same:
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap.server>:9092")
.option("subscribe", [List of Topics]) // multi topic subscribe
.load()
We are selecting value from the above DF and using Schema to de-serialize the AVRO message
val tableDF = kafkaDF.select(from_avro(col("value"), jsonSchema).as("table")).select("*")
The Roadblock here is , since we are using multiple Kafka topics , we have have integrated all our JSON schema into MAP with key being the topic name and values being the respective schema. How can we do a lookup using the same map in the select above? we tried UDF but it returns a "col" type but jsonSchema has to be of String type. Also it Schema is different for different topics
Couple of added questions:
- Is the above approach correct for consuming multiple topics at once?
- Should we use single consumer per topic?
- If we have more number of topics how can we achieve parallel topic processing, cos sequential might take substantial amount of time.