1
votes

I have a pyspark df who's schema looks like this

 root
 |-- company: struct (nullable = true)
 |    |-- 0: string (nullable = true)
 |    |-- 1: string (nullable = true)
 |    |-- 10: string (nullable = true)
 |    |-- 100: string (nullable = true)
 |    |-- 101: string (nullable = true)
 |    |-- 102: string (nullable = true)
 |    |-- 103: string (nullable = true)
 |    |-- 104: string (nullable = true)
 |    |-- 105: string (nullable = true)
 |    |-- 106: string (nullable = true)
 |    |-- 107: string (nullable = true)
 |    |-- 108: string (nullable = true)
 |    |-- 109: string (nullable = true)

I want the final format of this dataframe to look like this

id    name
0     "foo"
1     "laa"
10    "bar"
100   "gee"
101   "yoo"
102    "koo"

instead of

  0        1         10       100      101        102 
"foo"    "laa"      "bar"    "gee"    "yoo"      "koo"

which is what I get using 'col.*' expansion

I found the answer in this link How to explode StructType to rows from json dataframe in Spark rather than to columns

but that is scala spark and not pyspark. I am not familiar with the map reduce concept to change the script here to pyspark myself.

I am attaching a sample dataframe in similar schema and structure below..

from pyspark.sql import *
Employee = Row('employee1', 'employee2', 'employee3', 'employee4', 'employee5')
Salaries = Row('100000', '120000', '140000', '160000', '160000')

departmentWithEmployees1 = Row(employees=[Employee, Salaries])

departmentsWithEmployees_Seq = [departmentWithEmployees1]
dframe = spark.createDataFrame(departmentsWithEmployees_Seq)
dframe.show()

The structure of this dataframe is like this

root
 |-- employees: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)
 |    |    |-- _3: string (nullable = true)
 |    |    |-- _4: string (nullable = true)
 |    |    |-- _5: string (nullable = true)

How I want my final dataframe is like this

Firstname         Salary
employee1         10000
employee2         120000
2
I think I could translate that answer into pyspark for you, but can you provide me with a reproducible example so I can test (being lazy here!)ags29
Thank you for your help. I have provided a code that would create a dataframe similar to what I have and have further described my requirements.Dileep Unnikrishnan
I have to also mention, that the sample df above is not what I have with me exactly - I have employee1, employee2.. instead of _1, _2.. above. But I'm not able to make a dataframe quite like that. Thanks again. @ags29Dileep Unnikrishnan

2 Answers

1
votes

This can be done in two simple-ish select statements.

Note that the two examples you provided are slightly different, in the second example the struct col is within an array col.

I'll cover the more complex one but for the first (& original df) you can skip the first select statement.

dframe\
.selectExpr('employees[0] AS `key`', 'employees[1] AS `value`')\
.select(
    F.explode(F.map_from_arrays(F.array('key.*'),F.array('value.*'))
             ).alias('Firstname','Salary')
)

I'll try and explain the logic below.

root
 |-- employees: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)
 |    |    |-- _3: string (nullable = true)
 |    |    |-- _4: string (nullable = true)
 |    |    |-- _5: string (nullable = true)

Looking at the schema above what you need to do is:

1) Flatten the first array col to expose struct

2) Turn both struct cols into two array cols, create a single map col with map_from_arrays() col and explode.

map_from_arrays() takes one element from the same position from both the array cols (think Python zip()).

Let me know if this helps!

1
votes

First use element_at to get your firstname and salary columns, then convert them from struct to array using F.array, and F.arrays_zip columns before you explode, and then select all exploded zipped columns.

from pyspark.sql import functions as F
dframe.withColumn("firstname", F.element_at("employees", 1))\
.withColumn("salary",F.element_at("employees",2))\
.drop("employees")\
.withColumn("firstname",F.array("firstname.*"))\
.withColumn("salary", F.array("salary.*"))\
.withColumn("zip",F.explode(F.arrays_zip("firstname","salary")))\
.select("zip.*").show(truncate=False)

+---------+------+
|firstname|salary|
+---------+------+
|employee1|100000|
|employee2|120000|
|employee3|140000|
|employee4|160000|
|employee5|160000|
+---------+------+