I need to consume Kafka topic, which produces Dynamic Json string for each row.I cant parse the Json string without schema. In my case, Schema can be dynamic. spark.read.json can infer json schema.But it requires either "DATASET" or "JSON file".
Any way to convert the Kafka topic(value) to DATASET? So that i can use spark.read.json, which accept DATASET as input and it can parse the schema of json.
But If i use below code.
val klines = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "host1:port1,host2:port2").
option("subscribe", "topic").
load().
select($"value".cast("string").alias("value"))
val query = klines.
select(from_json($"value",schema=spark.read.json(klines.as[String]).schema)).
writeStream.
format("console").
start()
query.awaitTermination()
Getting below error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka
I'm doing few intermediate calculations like flattening the schema. But if i do so same error occurs. How i can handle basic intermediate calculations in spark structured streaming(scala)?
value.deserializer
toStringDeserializer
and parse it – JavaTechnical