2
votes

I have a PySpark dataframe (say df1) which has the following columns

1.> category : some string 2.> array1 : an array of elements 3.> array2 : an array of elements

Following is an example of df1

+--------+--------------+--------------+
|category|        array1|        array2|
+--------+--------------+--------------+
|A       |  [x1, x2, x3]|  [y1, y2, y3]|
|B       |      [u1, u2]|      [v1, v2]|
+--------+--------------+--------------+

For each row, the length of array1 is equal to the length of array2. In each column, I expect different rows to have different sizes of arrays for array1 (and array2).

I want to form separate columns (say element1 and element2) such that in each row, the columns element1 and element2 contain elements from same locations of array1 and array2 respectively.

Following is an example of the output dataframe (say df2) that I want:

+--------+--------------+--------------+----------+----------+
|category|        array1|        array2|  element1|  element2|
+--------+--------------+--------------+----------+----------+
|A       |  [x1, x2, x3]|  [y1, y2, y3]|        x1|        y1|
|A       |  [x1, x2, x3]|  [y1, y2, y3]|        x2|        y2|
|A       |  [x1, x2, x3]|  [y1, y2, y3]|        x3|        y3|
|B       |      [u1, u2]|      [v1, v2]|        u1|        v1|
|B       |      [u1, u2]|      [v1, v2]|        u2|        v2|
+--------+--------------+--------------+----------+----------+

Below is what I have tried till now (but it gives me values in element1 and element2 from different positions in addition to what I want.)

df2 = df1.select( "*", F.explode("array1").alias("element1") ).select( "*", F.explode("array2").alias("element2") ) 
2

2 Answers

3
votes

Initialization

import pyspark.sql.functions as F

sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

columns = ['category','array1','array2']
vals = [
     ('A', ['x1', 'x2', 'x3'], ['y1','y2','y3']),
     ('B', ['u1', 'u2',], ['v1','v2'])
]
df = sqlContext.createDataFrame(vals, columns)

Based on arrays_zip [docs]in spark >= 2.4

df.withColumn('new', F.arrays_zip('array1','array2')).withColumn('ex',explode('new'))\
    .select('category','array1','array2',
            col("ex.array1").alias('element1'),
            col("ex.array2").alias('element2')\
           ).drop('new','ex').show()

Output

+--------+------------+------------+--------+--------+
|category|      array1|      array2|element1|element2|
+--------+------------+------------+--------+--------+
|       A|[x1, x2, x3]|[y1, y2, y3]|      x1|      y1|
|       A|[x1, x2, x3]|[y1, y2, y3]|      x2|      y2|
|       A|[x1, x2, x3]|[y1, y2, y3]|      x3|      y3|
|       B|    [u1, u2]|    [v1, v2]|      u1|      v1|
|       B|    [u1, u2]|    [v1, v2]|      u2|      v2|
+--------+------------+------------+--------+--------+

Explanation Looking seeing what arrays_zip produces basically explains everything. We lump/zip the cols together with it and then explode it. Then simply reference the corresponding structs within the new column created by explode.

>>> df.withColumn('new', F.arrays_zip('array1','array2')).show(truncate=False)
+--------+------------+------------+------------------------------+
|category|array1      |array2      |new                           |
+--------+------------+------------+------------------------------+
|A       |[x1, x2, x3]|[y1, y2, y3]|[[x1, y1], [x2, y2], [x3, y3]]|
|B       |[u1, u2]    |[v1, v2]    |[[u1, v1], [u2, v2]]          |
+--------+------------+------------+------------------------------+
1
votes

For Spark >=2.4 you could use Higher-Order Functions:

data = [('A', ['x1', 'x2', 'x3'], ['y1', 'y2', 'y3']),
        ('B', ['u1', 'u2'], ['v1', 'v2'])
      ]
df = spark.createDataFrame(data, ["category", "array1", "array2"])

# tranform array1, array2 => [struct(element1, element2)]
transform_expr = "transform(array1, (x, i) -> struct(x as element1, array2[i] as element2))"

# explode transformed arrays and extract values of element1 and element2
df.withColumn("merged_arrays", explode(expr(transform_expr))) \
    .withColumn("element1", col("merged_arrays.element1")) \
    .withColumn("element2", col("merged_arrays.element2")) \
    .drop("merged_arrays") \
    .show(truncate=False)

Output:

+--------+------------+------------+--------+--------+
|category|array1      |array2      |element1|element2|
+--------+------------+------------+--------+--------+
|A       |[x1, x2, x3]|[y1, y2, y3]|x1      |y1      |
|A       |[x1, x2, x3]|[y1, y2, y3]|x2      |y2      |
|A       |[x1, x2, x3]|[y1, y2, y3]|x3      |y3      |
|B       |[u1, u2]    |[v1, v2]    |u1      |v1      |
|B       |[u1, u2]    |[v1, v2]    |u2      |v2      |
+--------+------------+------------+--------+--------+

Explanation for transform function:

The function takes the first array array1 and applies a lambda function (x, i) -> struct(string, string), where x the actual value and i its index in the array. For each value, we return a struct containing that value as element1 and the corresponding value in array2 (using the index i) as element2.

The rest is just exploding the result of transformation and accessing the struct elements we created.