0
votes

I need to consume Kafka topic, which produces Dynamic Json string for each row.I cant parse the Json string without schema. In my case, Schema can be dynamic. spark.read.json can infer json schema.But it requires either "DATASET" or "JSON file".

Any way to convert the Kafka topic(value) to DATASET? So that i can use spark.read.json, which accept DATASET as input and it can parse the schema of json.

But If i use below code.

val klines = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "host1:port1,host2:port2").
  option("subscribe", "topic").
  load().
  select($"value".cast("string").alias("value"))

val query = klines.
  select(from_json($"value",schema=spark.read.json(klines.as[String]).schema)).
  writeStream.
  format("console").
  start()

query.awaitTermination()

Getting below error: Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka

I'm doing few intermediate calculations like flattening the schema. But if i do so same error occurs. How i can handle basic intermediate calculations in spark structured streaming(scala)?

1
Are you using spark streaming or spark structured streaming? If you're not using schema then where will you load those data?Piyush Patel
@OneCricketeer: Spark-structured streaming. It can be complex nested schema. Will flatten the schema, and find the schema in run time before loading..Raja
@JavaTechnical:I cant change the Kafka output(select($"value".cast( to ="string").alias(alias = "value"))) to Seq. Any workaround pls?Raja
@Raja You can always de-serialize the message to String (using StringDeserializer in Kafka consumer) and then the value you will get will be string, which you can eventually parse. So you don't need to cast to String, just put the value.deserializer to StringDeserializer and parse itJavaTechnical

1 Answers

0
votes

JSON is a string. You can just a schema of string type.

So that i can use spark.read.json

spark.read.json is from the filesystem.

You probably want spark.readStream.format("kafka") if you want to read from Kafka, which is described in the Spark documentation in enough detail

First example in Spark documentation does exactly that

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

You will have issues doing any type of useful analysis of the data, however given that each record has the potential to not share the same fields, so doing something like get_json_object would be pointless

You would argulably be better off using raw Kafka consumer API or KStreams, which do not require any schema, however your issue is not schemas -- it is deserialization to an Object type with concrete fields that can be queried