3
votes

I am trying to implement a custom explode in Pyspark. I have 4 columns that are arrays of structs with virtually the same schema (one columns structs contain one less field than the other three).

For each row in my DataFrame, I have 4 columns that are arrays of structs. The columns are students, teaching_assistants, teachers, administrators.

The students, teaching_assistants and teachers are arrays of structs with field id, student_level and name.

For example, here is a sample row in the DataFrame.

enter image description here

The students, teaching_assistants and teachers structs all have the same schema ("id", "student_level", "name") and the administrators struct has the "id" and "name" fields but is missing the student level.

I want to perform a custom explode such that for every row I have one entry for each student, teaching assistant, professor and administrator along with the original column name in case I had to search by "person type". So for the screenshot of the row above, the output would be 8 rows:

+-----------+---------------------+----+---------------+----------+
| School_id |        type         | id | student_level |   name   |
+-----------+---------------------+----+---------------+----------+
|      1999 | students            |  1 | 0             | Brian    |
|      1999 | students            |  9 | 2             | Max      |
|      1999 | teaching_assistants | 19 | 0             | Xander   |
|      1999 | teachers            | 21 | 0             | Charlene |
|      1999 | teachers            | 12 | 2             | Rob      |
|      1999 | administrators      | 23 | None          | Marsha   |
|      1999 | administrators      | 11 | None          | Ryan     |
|      1999 | administrators      | 14 | None          | Bob      |
+-----------+---------------------+----+---------------+----------+

For the administrators, the student_level column would just be null. The problem is if I use the explode function, I end up with all of these items in different columns.

Is it possible to accomplish this in Pyspark? One thought I had was to figure out how to combine the 4 array columns into 1 array and then do an explode on the array, although I am not sure if combining arrays of structs and getting the column names as a field is feasible (I've tried various things) and I also don't know if it would work if the administrators were missing a field.

In the past, I have done this by converting to RDD and using a flatmap/custom udf but it was very inefficient for millions of rows.

1

1 Answers

2
votes

The idea is to use stack to transform the columns students, teaching_assistants, teachers and administrators into separate rows with the correct value for each type. After that, the column containing the data can be exploded and then the elements of the single structs can be transformed into separate columns.

Using stack requires that all columns that are stacked have the same type. This means that all columns must contain arrays of the same struct and also the nullability of all elements of the struct must match. Therefore the administrators column has to converted into the correct struct type first.

df.withColumn("administrators", F.expr("transform(administrators, " +
        "a -> if(1<2,named_struct('id', a.id, 'name', a.name, 'student_level', "+
              "cast(null as long)),null))"))\
  .select("School_id", F.expr("stack(4, 'students', students, "+
          "'teaching_assistants', teaching_assistants, 'teachers', teachers, "+
          "'administrators', administrators) as (type, temp1)")) \
  .withColumn("temp2", F.explode("temp1"))\
  .select("School_id", "type", "temp2.id", "temp2.name", "temp2.student_level")\
  .show()

prints

+---------+-------------------+---+--------+-------------+
|School_id|               type| id|    name|student_level|
+---------+-------------------+---+--------+-------------+
|     1999|           students|  1|   Brian|            0|
|     1999|           students|  9|     Max|            2|
|     1999|teaching_assistants| 19|  Xander|            0|
|     1999|           teachers| 21|Charlene|            0|
|     1999|           teachers| 12|     Rob|            2|
|     1999|     administrators| 23|  Marsha|         null|
|     1999|     administrators| 11|    Ryan|         null|
|     1999|     administrators| 14|     Bob|         null|
+---------+-------------------+---+--------+-------------+

The strange looking if(1<2, named_struct(...), null) in the first line is necessary to set the correct nullabilty for the elements of the administrators array.

This solution works for Spark 2.4+. If it was possible to transform the administrators struct in a previous step, this solution would also work for earlier versions.