I am trying to process data from Kafka using Spark Structured Streaming. The code for ingesting the data is as follows:
val enriched = df.select($"value" cast "string" as "json")
.select(from_json($"json", schema) as "data")
.select("data.*")
ds is a DataFrame with the data consumed from Kafka.
The problem comes when I try to read is as JSON in order to do faster queries. the function that comes from org.apache.spark.sql.functions from_json() is asking obligatory for a schema. What if the messages have some different fields?