2
votes

I am trying to read message from Kafka Topic. Message are in below format (sample format):

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

Also, please note topic has message from different tables and not just 1 table.

What I am trying to achieve is to read above message from Kafka Topic using Spark Structured Streaming and create a dataframe with column names ad its value both coming from JSON message itself.

I don't want to explicitly define a schema using case class or StructType.

I tried this:

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()

val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")

When I view Y (which is a dataframe), it comes as 1 column with value under payload as JSON in that column.

How to get individual column in a dataframe? I am not achieve this.

(Again reiterating I cannot use a generic case class or StructType for schema part as messages coming through Kafka message are from different tables so I want a more of dynamic Schema created from JSON itself on the run.)

1
Umh, you could probably access to single values via dot notation, payload.schema.type should return "struct" as valueTizianoreica
I think you need to cast the value to a string (from a byte array) before you can use get_json_objectOneCricketeer

1 Answers

1
votes

Option 1: Change the Kafka Connect source to set value.converter.schemas.enable=false. This will only give you the (unwrapped payload to begin with), then you can skip to below post.

Otherwise, after you strip the Connect schema, you would need to use from_json() to apply a schema

val y = df.select(get_json_object($"value", "$.payload").alias("payload"))
val z = df.select(from_json($"payload", schema))

All your fields are strings, so would look like

val schema: StructType = StructType(Seq(
  StructField("emp_id", StringType()),
  StructField("emp_name", StringType()),
  StructField("city", StringType()),
  StructField("emp_sal", StringType()),
  StructField("manager_name", StringType())
))

Related