1
votes

I am creating a pyspark dataframe using reading it from kafka topic message which is a complex json message.The one part of json message is as below -

{
"paymentEntity": {  
"id": 3081458,
"details": {
  "values": [
    {
      "CardType": "VisaDebit"
    },
    {
      "CardNumber": "********8759"
    },
    {
      "WorldPayMasterId": "c670b980c50eb50373f66a1fe2bf8e70d320a0f7"
    }
  ]
}}}

After reading it into DataFrame its shcema and data shows like below -

root
 |-- details: struct (nullable = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- CardNumber: string (nullable = true)
 |    |    |    |-- CardType: string (nullable = true)
 |    |    |    |-- WorldPayMasterId: string (nullable = true)
 |-- id: long (nullable = true)

+-----------------------------------------------------------------------------------+-------+
|details                                                                            |id     |
+-----------------------------------------------------------------------------------+-------+
|[[[, VisaDebit,], [********8759,,], [,, c670b980c50eb50373f66a1fe2bf8e70d320a0f7]]]|3081458|
+-----------------------------------------------------------------------------------+-------+

If I transform with below code

jsonDF = jsonDF.withColumn("paymentEntity-details- 
values",explode(col('paymentEntity.details.values'))) \
            .withColumn('id',col('paymentEntity.id')).drop('paymentEntity')

Then output comes like as below

root
 |-- paymentEntity-details-values: struct (nullable = true)
 |    |-- CardNumber: string (nullable = true)
 |    |-- CardType: string (nullable = true)
 |    |-- WorldPayMasterId: string (nullable = true)
 |-- id: long (nullable = true)

+---------------------------------------------+-------+
|paymentEntity-details-values                 |id     |
+---------------------------------------------+-------+
|[, VisaDebit,]                               |3081458|
|[********8759,,]                             |3081458|
|[,, c670b980c50eb50373f66a1fe2bf8e70d320a0f7]|3081458|
+---------------------------------------------+-------+

I would like to process its and transform the DataFrame output like as below without explode the array filed -

+------------+---------+---------------------------------------------------+-------+
|cardnumber  |CardType |WorldPayMasterId                                   |id     |
+------------+---------+---------------------------------------------------+-------+
|********8759|VisaDebit|c670b980c50eb50373f66a1fe2bf8e70d320a0f7           |3081458|
+------------+---------+---------------------------------------------------+-------+

Please anyone suggest how to get the same , any help is appreciated.

1

1 Answers

0
votes

Since you have an ArrayType in your struct, exploding makes sense. You can select individual fields after that and do a little aggregation to make it formatted in the way you want

from pyspark.sql import functions as F

(df
    .withColumn('id', F.col('paymentEntity.id'))
    .withColumn('tmp', F.explode(F.col('paymentEntity.details.values')))
    .withColumn('CardNumber', F.col('tmp.CardNumber'))
    .withColumn('CardType', F.col('tmp.CardType'))
    .withColumn('WorldPayMasterId', F.col('tmp.WorldPayMasterId'))
    .drop('paymentEntity', 'tmp')
    .groupBy('id')
    .agg(
        F.first('CardNumber', ignorenulls=True).alias('CardNumber'),
        F.first('CardType', ignorenulls=True).alias('CardType'),
        F.first('WorldPayMasterId', ignorenulls=True).alias('WorldPayMasterId'),
    )
    .show(10, False)
)

+-------+------------+---------+----------------------------------------+
|id     |CardNumber  |CardType |WorldPayMasterId                        |
+-------+------------+---------+----------------------------------------+
|3081458|********8759|VisaDebit|c670b980c50eb50373f66a1fe2bf8e70d320a0f7|
+-------+------------+---------+----------------------------------------+