1
votes

What I'm trying to accomplish is exactly what this question about (Here) however; In my case I'm using Python/Pyspark Not Scala.

I'm trying to extract "payload" part of Kafka connect message that include schema as well.

Sample message :

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

Step 1 - Defind schema for "payload" part:

payload_schema = StructType([
StructField("emp_id", StringType(), False),
StructField("emp_name", StringType(), True),
StructField("city", StringType(), True),
StructField("emp_sal", StringType(), True),
StructField("manager_name", StringType(), True)])

Step 2 - Read from Kafka :

df =spark.readStream.format("kafka")

Step 3 - get message value from Kafka message:

kafka_df = df.selectExpr("CAST(value AS STRING)")

Step 4 - Extract "payload" only (I'm stuck here):

    import pyspark.sql.functions as psf

    emp_df = kafka_df\
    .select(psf.from_json(psf.col('value'), payload_schema).alias("DF"))\
    .select("DF.*")

I'm stuck in this part as I couldn't figure out how to extract payload from JSON string before passing it to from_json() function.

Note : I'm aware of that I need to define the full schema for the entire message before I can make use of it in from_json(), however; I'm trying to get only "payload" json string part.

2

2 Answers

1
votes

You could make use of the SQL function get_json_object:

import pyspark.sql.functions as psf

kafka_df
  .select(psf.get_json_object(kafka_df['value'],"$.payload").alias('payload'))
  .select(psf.from_json(psf.col('payload'), payload_schema).alias("DF"))
  .select("DF.*")

Or, you need to define the full schema for the entire message before you can make use of it in from_json.

That means your schema should rather look like below:

full_schema = StructType([
  StructField("schema", StructType([
    StructField("type", StringType(), False),
    StructField("name", StringType(), False),
    StructField("fields", StructType([
      StructField("field", StringType(), False),
      StructField("type", StringType(), False)
    ]),
  StructField("payload", StructType([
    StructField("emp_id", StringType(), False),
    StructField("emp_name", StringType(), True),
    StructField("city", StringType(), True),
    StructField("emp_sal", StringType(), True),
    StructField("manager_name", StringType(), True)
  ])
])

Please double check this schema definition as I am not entirely sure how to define an Array within a schema in Python, but I hope the idea is clear.

As soon as this is done, you can select the payload fields by

import pyspark.sql.functions as psf

    emp_df = kafka_df\
    .select(psf.from_json(psf.col('value'), full_schema).alias("DF"))\
    .select("DF.payload.*")
0
votes

For some reason I missed that pyspark has get_json_object() function. After Mike's comment I went back to the documentation I found what I was looking for.

Here's the answer :

    kafka_df = df.selectExpr("CAST(value AS STRING)")
    payload_df = kafka_df.select(psf.get_json_object(kafka_df.value, "$.payload").alias("payload"))
    emp_df = payload_df.select(psf.from_json(psf.col('payload'), schema).alias("DF")).select("DF.*")