1
votes

I have the following data frame and I would like to explode the values column, so that each value is in the separate column:

id | values
-----------------------
1  | '[[532,969020406,89],[216,969100125,23],[169,39356140000,72],[399,14407358500,188],[377,13761937166.6667,24]]'
2 | '[[532,969020406,89]]'

Note that the lists under the values column can have different lengths and that they are of String data type.

The desired table should look like this:

id | v11 | v12 | v13 | v21 | v22... 
--------------------------------------
1  | 532 | 969020406 | 89 | 216 | 969100125...
2 | 532 | 969020406 | 89 | Null | Null...

I tried to specify the schema and use the from_json method to create the array and then explode it, but I encountered issues, namely any of the schemas seems not fit into my data

json_schema =  types.StructType([types.StructField('array', types.StructType([ \
    types.StructField("v1",types.StringType(),True), \
    types.StructField("v2",types.StringType(),True), \
    types.StructField("v3",types.StringType(),True)
  ]))])

json_schema = types.ArrayType(types.StructType([ \
    types.StructField("v1",types.StringType(),True), \
    types.StructField("v2",types.StringType(),True), \
    types.StructField("v3",types.StringType(),True)
  ]))

json_schema = types.ArrayType(types.ArrayType(types.IntegerType()))

df.select('id', F.from_json('values', schema=json_schema)).show()

The proceeding returns only the Null value or an empty array: [,,]

I got also the following error: StructType can not accept object '[' in type <class 'str'>

Schema of the input data inferred by Pyspark:

root
 |-- id: integer (nullable = true)
 |-- values: string (nullable = true)

Any help would be appreciated.

1
Could you please add the schema of the input data, e.g. the output of df.printSchema() for your original data?werner

1 Answers

1
votes

For Spark 2.4+ you can use a combination of split and transform to transform the string into a two-dimendional array. The single entries of this array can then be separately transformed into columns.

from pyspark.sql import functions as F

df2 = df.withColumn("parsed_values", F.expr("transform(split(values, '\\\\],\\\\['), " +
           "c ->  transform(split(c, ','), d->regexp_replace(d,'[\\\\[\\\\]]','')))"))\
    .withColumn("length", F.size("parsed_values"))

max_length = df2.agg(F.max("length")).head()["max(length)"]

df2 has now the structure

root
 |-- id: string (nullable = true)
 |-- values: string (nullable = true)
 |-- parsed_values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- length: integer (nullable = false)

and max_length contains the maximum numbers of entries in one row (5 for the example data).

parsed_value[0][1] would return the second sub-entry of the first entry. This would be 969020406 for the example data.

The second step is to transform the nested array into columns.

cols = [F.col('parsed_values').getItem(x).getItem(y).alias("v{}{}".format(x+1,y+1)) \
    for x in range(0, max_length) for y in range(0,3)]

df2.select([F.col('id')] + cols).show()

Output:

+---+---+---------+---+----+---------+----+----+-----------+----+----+-----------+----+----+----------------+----+
| id|v11|      v12|v13| v21|      v22| v23| v31|        v32| v33| v41|        v42| v43| v51|             v52| v53|
+---+---+---------+---+----+---------+----+----+-----------+----+----+-----------+----+----+----------------+----+
|  1|532|969020406| 89| 216|969100125|  23| 169|39356140000|  72| 399|14407358500| 188| 377|13761937166.6667|  24|
|  2|532|969020406| 89|null|     null|null|null|       null|null|null|       null|null|null|            null|null|
+---+---+---------+---+----+---------+----+----+-----------+----+----+-----------+----+----+----------------+----+

The solution could be improved if there was a way to determine max_length without having to find the maximum over the complete data, for example if this value was known beforehand.