1
votes

I'm quite new on pyspark and I'm dealing with a complex dataframe. I'm stuck trying to get N rows from a list into my df.column after some filtering.

I have the following df.struct:

root
 |-- struct1: struct (nullable = true)
 |    |-- array1: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- struct2 : struct (nullable = true)
 |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |    |    |    |-- struct3 : struct (nullable = true)
 |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)
 |    |    |    |-- property: string (nullable = true)

What I want to achieve is get the sum of all struct2.values when the property is Good. Because I can have multiple(N) values for array1.

Right now, I got the a small sentence to get the first property. But I cant pass it to an udf in a success way to iterate over all possible rows: df.withColumn("Sum", (col('struct1.array1')[0])['property'])

Some steps that I have in mind is:

  • Filter each element inside the list when property=Good

  • Return a lambda value in a udf with the sum of struct3.value

Desired output should be something like:

None
+---------------------------------------------------------------------------------------------------------+
|Struct1                                                                                            |Sum|
+---------------------------------------------------------------------------------------------------------+
|[[[[2020-01-01, 10], [2020-02-02, 15], Good], [[2020-01-01, 20], [2020-02-02, 25], Good]]]         |20|
+---------------------------------------------------------------------------------------------------------+

Any help will be appreciate

1

1 Answers

2
votes

You don't necessarily need an UDF in this case. When using Spark >= 2.4.0 you can achieve the same just by using the build-in high-order functions as shown next:

from pyspark.sql.functions import expr  

df.withColumn("good_elements", expr("""transform( \
                                         filter(struct1.array1, e -> e.property == 'Good'), 
                                         e -> cast(e.struct2.value as int)
                                    )""")) \
  .withColumn("sum", expr("aggregate(good_elements, 0, (sum, e) -> sum + e)"))
  • filter(struct1.array1, e -> e.property == 'Good'): First we filter the items that have property == 'Good'

  • transform(..., e -> cast(e.struct2.value as int): Next we convert every item to an integer and store them into a new column called good_elements

  • aggregate(good_elements, 0, (sum, e) -> sum + e): finally we create the column sum by calculating the sum of good_elements