I have a pyspark dataframe df like
+-----+----+------------+------------+-------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_Values |S_values |
+-----+----+------------+------------+-------------+------------+
| Bob1| 16 | [x1,x2] | [x1,x3]|["ab",1] | [1,2] |
| Bob2| 16 |[x1,x2,x3] | [] |["a","b","c"]| [] |
+-----+----+------------+------------+-------------+------------+
I would like to final create df as below,
+-----+----+------------+------------+
| Name| Age| Attribute | Values|
+-----+----+------------+------------+
| Bob1| 16 | x1 | ab |
| Bob1| 16 | x2 | 1 |
| Bob1| 16 | x1 | 1 |
| Bob1| 16 | x3 | 2 |
| Bob2| 16 | x1 | a |
| Bob2| 16 | x2 | b |
| Bob2| 16 | x3 | c |
+-----+----+------------+------------+
basically I want to merge these 2 column and explode them into rows. With the help of pyspark array functions I was able to concat arrays and explode, but to identify difference between professional attributes and sport attributes later as they can have same names. I need a type column as well,
+-----+----+------------+------------+------------+
| Name| Age| Attribute| type |Value |
+-----+----+------------+------------+------------+
| Bob1| 16 | x1 | 1 | ab |
| Bob1| 16 | x2 | 1 | 1 |
| Bob1| 16 | x1 | 2 | 1 |
| Bob1| 16 | x3 | 2 | 2 |
| Bob2| 16 | x1 | 1 | a |
| Bob2| 16 | x2 | 1 | b |
| Bob2| 16 | x3 | 1 | c |
+-----+----+------------+------------+------------+
So I thought to create a separate array columns initially as,
+-----+----+------------+------------+------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_type |S_type |
+-----+----+------------+------------+------------+------------+
| Bob1| 16 | [x1,x2] | [x1,x3]| [1,1] | [2,2] |
| Bob2| 16 |[x1,x2,x3] | [] | [1,1,1] | [] |
+-----+----+------------+------------+------------+------------+
So that I can merge columns and explode with required type column as well as shown in above df. Problem is I am not able to create P_type and S_type columns dynamically. I tried below code,
new_df = df.withColumn("temp_P_type", F.lit(1))\
.withColumn("P_type", F.array_repeat("temp_P_type",F.size("P_Attribute")))
This throws TypeError: Column is not iterable
error.
it also doesn't work if length of column is already extracted as another column.
Can anybody help me with this or if there is any better solution to do this? Is it possible to do this as df level without going to RDD and python functions (without UDF)?
P.S. I am using spark 2.4