1
votes

I have static lists group_1 and group_2:

group_1 = [a,b,c,d,e,f,g]
group_2 = [h,i,j,k]

I have pyspark dataframe df1 as shown below.

Example1:

df1:

+-----+----------------------------------------+-----------------------------------------+
|id   |array1                                  |array2                                   |
+-----+----------------------------------------+-----------------------------------------+
|id1  |[a,b,c,d,group_1,group_2]               |[a,b,c,d,e,f,g,h,i,j,k]                  |
+-----+----------------------------------------+-----------------------------------------+

output_df:

+-----+-------------------|-------------------|
|id   |col1               |col2               |
+-----+-------------------|-------------------|
|id1  |[a,b,c,d]          |[a,b,c,d]          |
|id1  |[e,f,g]            |group_1            |
|id1  |[h,i,j,k]          |group_2            |
+-----+-------------------|-------------------|

Actually, array2 column will have elements from array1 column. That's how my source dataframe (source_df1) will be.

If we see array1 column there are individual elements like (a,b,c,d) and also group_1 and group_2 elements but all together they are distinct.

Now I want to create pyspark dataframe by exploding such a way that individual and group elements are categorized as shown in output_df.

Example1 Observation: If we see the output dataframe output_df, the second record group_1 has only [e,f,g] because other elements are already part of individual elements.

Example2:

source_df1:

+-----+----------------------------------------+-----------------------------------------+
|id   |array1                                  |array2                                   |
+-----+----------------------------------------+-----------------------------------------+
|id1  |[a,b,group_1,group_2]                   |[a,b,c,d,e,f,g,h,i,j,k]                  |
+-----+----------------------------------------+-----------------------------------------+

output_df:

+-----+-------------------|-------------------|
|id   |col1               |col2               |
+-----+-------------------|-------------------|
|id1  |[a,b]              |[a,b]              |
|id1  |[c,d,e,f,g]        |group_1            |
|id1  |[h,i,j,k]          |group_2            |
+-----+-------------------|-------------------|

Example2 Observation: If we see the output dataframe output_df. the second record group_1 has only [c,d,e,f,g] because other elements are already part of individual elements.

Can anyone please help on achieving this?

1

1 Answers

0
votes

If you can use Spark 2.4+, you can achieve that with some array functions :

from pyspark.sql import functions as F


df1 = df.withColumn(
    "individual",
    F.array_except(F.col("array1"), F.array(*[F.lit("group_1"), F.lit("group_2")]))
).withColumn(
    "group_1",
    F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
).withColumn(
    "group_2",
    F.array_except(F.array(*[F.lit(i) for i in group_2]), "individual")
).withColumn(
    "array2",
    F.explode(F.array(
        *[
            F.struct(F.array_intersect("array2", "individual").alias("col1"),
                     F.col("individual").cast("string").alias("col2")),
            F.struct(F.array_intersect("array2", "group_1").alias("col1"),
                     F.lit("group_1").alias("col2")),
            F.struct(F.array_intersect("array2", "group_2").alias("col1"),
                     F.lit("group_2").alias("col2"))
        ])
    )
).select("id", "array2.*")

df1.show(truncate=False)

#+---+------------+------------+
#|id |col1        |col2        |
#+---+------------+------------+
#|id1|[a, b, c, d]|[a, b, c, d]|
#|id1|[e, f, g]   |group_1     |
#|id1|[h, i, j, k]|group_2     |
#+---+------------+------------+

Explanations:

  • First, divide array1 into three arrays: individual, group_1 and group_2. Each one contains elements for the corresponding group. Elements from group_1 and group_2 that are present in individual are removed from those groups.
  • Then, using array_intersect function get elements from array2 column that are present in each of the three group arrays created above.
  • Finally, explode the new array created above

Note that if you want to verify that group_1 or group_2 is present in array1 column you can use when with array_contains functions :

F.when(
    F.array_contains(F.col("array1"), F.lit("group_1")),
    F.array_except(F.array(*[F.lit(i) for i in group_1]), "individual")
)

In the example, I supposed it is always present in array1.