I am trying to read message from Kafka Topic. Message are in below format (sample format):
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
Also, please note topic has message from different tables and not just 1 table.
What I am trying to achieve is to read above message from Kafka Topic using Spark Structured Streaming and create a dataframe with column names ad its value both coming from JSON message itself.
I don't want to explicitly define a schema using case class or StructType.
I tried this:
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()
val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")
When I view Y (which is a dataframe), it comes as 1 column with value under payload as JSON in that column.
How to get individual column in a dataframe? I am not achieve this.
(Again reiterating I cannot use a generic case class or StructType for schema part as messages coming through Kafka message are from different tables so I want a more of dynamic Schema created from JSON itself on the run.)
get_json_object
– OneCricketeer