0
votes

In pyspark I have a data frame composed of two columns

Assume the details in the array of array are timestamp, email, phone number, first name, last name, address, city, country, randomId

+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| str1                    | array_of_str                                                                                                                                                                                                                                                                                                                                                                                |
+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| random column data1     | [[‘2020-01-26 17:30:57.000 +0000’, ’’, ‘728-802-5766’, ‘’, ‘’, ‘7th street crossroads’, ‘seattle’, ‘’, ‘randomId104’], [‘2019-07-20 20:54:57.000 +0000’, ’[email protected]’, ‘728-802-5766’, ‘Katuscha’, ‘’, ‘’, ‘’, ‘us’, ‘randomId225’], [‘2015-12-04 04:54:57.000 +0000’, ’[email protected]’, ‘728-802-5766’, ‘’, ‘Othen’, ‘7th street crossroads’, ‘seattle’, ‘’, ‘randomid306’]]|
| random column data2     | [[‘2021-01-30 17:30:04.000 +0000’, ’[email protected]’, ‘313-984-9692’, ‘’, ‘’, ‘th street crossroads’, ‘New york’, ‘us’, ‘randomId563’], [‘2018-05-15 20:44:57.000 +0000’, ’[email protected]’, ‘’, ‘Marianne’, ‘Allmann’, ‘’, ‘’, ‘us’, ‘randomId884’]]                                                                                                                                |
+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

I am expecting output data frame like below

+-------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| str1                    | array_of_str                                                                                                                                         |
+-------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| random column data1     | [‘2020-01-26 17:30:57.000 +0000’, ’[email protected]’, ‘728-802-5766’, ‘Katuscha’, ‘Othen’, ‘7th street crossroads’, ‘seattle’, ‘us’, ‘randomid306’] |
| random column data2     | [‘2021-01-30 17:30:04.000 +0000’, ’[email protected]’, ‘313-984-9692’, ‘Marianne’, ‘Allmann’, ‘111th Ave NE’, ‘New york’, ‘us’, ‘randomId884’]      |
+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------- +

optional :- existing data in array of array might not be already sorted at decreasing timestamp order. How do I sort the array of array in decreasing timestamp order

Here I am planning to write a udf to pull latest non null (timestamp, email, phone number, first name, last name, address, city, country) data from array of array. In case of randomId, I will always pull the randomId associated with the oldest record in the system.

example:- for random column data1 emailId i.e. [email protected] is getting populated from second element in the array since the first one is having empty email id. similar is the case with other columns. In case of randomid randomid306 for first record is the oldest entry so its populated in my output data frame.

In the udf

  1. How do I sort the array of array elements in descending timestamp order? - kind of an optional step

  2. How do i iterate over the array of array column in the data frame?

3)How to access individual items on the array in a udf?

like in the case of python we can iterate over list of list elements like

for item in items:
   print(item[0], item[1])

how can I achieve similar thing for array of array columns in pyspark?

can I do above steps in pyspark by not converting the data to pandas dataframe?

spark version 2.4.3 python 3.6.8

1

1 Answers

0
votes

You can use the built in sort_array for this, create a new column using this and extract the first element (0) using getItem

Built in sort_array


input_list = [('10',[
                  ['2020-01-26 17:30:57.000 +0000', '', '728-802-5766', '', '', '7th street crossroads', 'seattle', '', 'randomId104']
                , ['2019-07-20 20:54:57.000 +0000', '[email protected]', '728-802-5766', 'Katuscha', '', '', '', 'us', 'randomId225']
                , ['2015-12-04 04:54:57.000 +0000', '[email protected]', '728-802-5766', '', 'Othen', '7th street crossroads', 'seattle', '', 'randomid306']
            ])
        ,('20',[
          ['2021-01-30 17:30:04.000 +0000', '[email protected]', '313-984-9692', '', '', 'th street crossroads', 'New york', 'us', 'randomId563']
        , ['2018-05-15 20:44:57.000 +0000', '[email protected]', '', 'Marianne', 'Allmann', '', '', 'us', 'randomId884']
        ])
    ]

sparkDF = sql.createDataFrame(input_list,['id','array_str'])


sparkDF = sparkDF.withColumn('sorted_array_str'
                                 ,F.sort_array(F.col('array_str'),False).getItem(0))


sparkDF.select(['id','sorted_array_str']).show(truncate=False)

UDF

# array_sort_udf, sorts on the timestamp , can be futher customized for giving precedence to multiple elements

array_sort_udf = F.udf(lambda x : sorted(x,key=lambda x: x[0], reverse=True), ArrayType(StringType()))

sparkDF = sparkDF.withColumn('sorted_array_str'
                                 ,array_sort_udf(F.col('array_str')).getItem(0))

sparkDF.select(['id','sorted_array_str']).show(truncate=False)

Output

+---+----------------------------------------------------------------------------------------------------------------------+
|id |sorted_array_str                                                                                                      |
+---+----------------------------------------------------------------------------------------------------------------------+
|10 |[2020-01-26 17:30:57.000 +0000, , 728-802-5766, , , 7th street crossroads, seattle, , randomId104]                    |
|20 |[2021-01-30 17:30:04.000 +0000, [email protected], 313-984-9692, , , th street crossroads, New york, us, randomId563]|
+---+----------------------------------------------------------------------------------------------------------------------+