I have a Spark Structured Streaming job which is streaming data from multiple Kafka topics based on a subscribePattern
and for every Kafka topic I have a Spark schema. When streaming the data from Kafka I want to apply the Spark schema to the Kafka message based on the topic name.
Consider I have two topics: cust & customers.
Streaming data from Kafka based on subscribePattern
(Java regex string):
var df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "cust*")
.option("startingOffsets", "earliest")
.load()
.withColumn("value", $"value".cast("string"))
.filter($"value".isNotNull)
the above streaming query streams data from both the topics.
Let's say I have two Spark schemas one for each topic:
var cust: StructType = new StructType()
.add("name", StringType)
.add("age", IntegerType)
var customers: StructType = new StructType()
.add("id", IntegerType)
.add("first_name", StringType)
.add("last_name", StringType)
.add("email", StringType)
.add("address", StringType)
Now, I want to apply the Spark Schema based on topic name and to do that I have written a udf which reads the topic name and returns the schema in DDL format:
val schema = udf((table: String) => (table) match {
case ("cust") => cust.toDDL
case ("customers") => customers.toDDL
case _ => new StructType().toDDL
})
Then I am using the udf (I understand that udf applies on every column) inside the from_json method like this:
val query = df
.withColumn("topic", $"topic".cast("string"))
.withColumn("data", from_json($"value", schema($"topic")))
.select($"key", $"topic", $"data.*")
.writeStream.outputMode("append")
.format("console")
.start()
.awaitTermination()
This gives me the following exception which is correct because from_json expects String schema in DDL format or StructType.
org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of UDF(topic);
I want to know how to accomplish this?
Any help will be appreciated!