What I'm trying to accomplish is exactly what this question about (Here) however; In my case I'm using Python/Pyspark Not Scala.
I'm trying to extract "payload" part of Kafka connect message that include schema as well.
Sample message :
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
Step 1 - Defind schema for "payload" part:
payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])
Step 2 - Read from Kafka :
df =spark.readStream.format("kafka")
Step 3 - get message value from Kafka message:
kafka_df = df.selectExpr("CAST(value AS STRING)")
Step 4 - Extract "payload" only (I'm stuck here):
import pyspark.sql.functions as psf
emp_df = kafka_df\
.select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
.select("DF.*")
I'm stuck in this part as I couldn't figure out how to extract payload from JSON string before passing it to from_json() function.
Note : I'm aware of that I need to define the full schema for the entire message before I can make use of it in from_json(), however; I'm trying to get only "payload" json string part.