0
votes

I have a Spark Structured Streaming job which is streaming data from multiple Kafka topics based on a subscribePattern and for every Kafka topic I have a Spark schema. When streaming the data from Kafka I want to apply the Spark schema to the Kafka message based on the topic name.

Consider I have two topics: cust & customers.

Streaming data from Kafka based on subscribePattern (Java regex string):

var df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "cust*")
  .option("startingOffsets", "earliest") 
  .load()
  .withColumn("value", $"value".cast("string"))
  .filter($"value".isNotNull)

the above streaming query streams data from both the topics.

Let's say I have two Spark schemas one for each topic:

var cust: StructType = new StructType()
    .add("name", StringType)
    .add("age", IntegerType)

var customers: StructType = new StructType()
    .add("id", IntegerType)
    .add("first_name", StringType)
    .add("last_name", StringType)
    .add("email", StringType)
    .add("address", StringType)

Now, I want to apply the Spark Schema based on topic name and to do that I have written a udf which reads the topic name and returns the schema in DDL format:

val schema = udf((table: String) => (table) match {
    case ("cust")      => cust.toDDL
    case ("customers") => customers.toDDL
    case _             => new StructType().toDDL
  })

Then I am using the udf (I understand that udf applies on every column) inside the from_json method like this:

val query = df
    .withColumn("topic", $"topic".cast("string"))
    .withColumn("data", from_json($"value", schema($"topic")))
    .select($"key", $"topic", $"data.*")
    .writeStream.outputMode("append")
    .format("console")
    .start()
    .awaitTermination()

This gives me the following exception which is correct because from_json expects String schema in DDL format or StructType.

org.apache.spark.sql.AnalysisException: Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of UDF(topic);

I want to know how to accomplish this?

Any help will be appreciated!

1
cust & customers schema are fixed, right ?Srinivas
yeah the schema is fixed.TechnocratSid
remove .toDDL in UDF & try againSrinivas
The reason I kept .toDDL because it returns a String. If I don't use .toDDL it will return a StructType which is not a part of Types of ScalaReflection and was giving me this exception: "scala.MatchError: org.apache.spark.sql.types.StructType (of class scala.reflect.internal.Types$ClassNoArgsTypeRef)"TechnocratSid

1 Answers

2
votes

What you're doing is not possible. Your query df can't have 2 different schemas.

I can think of 2 ways to do it:

  1. Split your df by topic, then apply your 2 schemas to 2 dfs (cust and customers)
  2. Merge the 2 schemas into 1 schema and apply that to all topics.