As the subject describes, I have a PySpark Dataframe that I need to cast two columns into a new column that is a list of tuples based the value of a third column. This cast will reduce or flatten the dataframe by a key value, product id in this case, and the result os one row per key.
There are hundreds of millions of rows in this dataframe, with 37M unique product ids. Therefore I need a way to do the transformation on the spark cluster without bringing back any data to the driver (Jupyter in this case).
Here is an extract of my dataframe for just 1 product:
+-----------+-------------------+-------------+--------+----------+---------------+
| product_id| purchase_date|days_warranty|store_id|year_month| category|
+-----------+-------------------+-----------+----------+----------+---------------+
|02147465400|2017-05-16 00:00:00| 30| 205| 2017-05| CATEGORY A|
|02147465400|2017-04-15 00:00:00| 30| 205| 2017-04| CATEGORY A|
|02147465400|2018-07-11 00:00:00| 30| 205| 2018-07| CATEGORY A|
|02147465400|2017-06-14 00:00:00| 30| 205| 2017-06| CATEGORY A|
|02147465400|2017-03-16 00:00:00| 30| 205| 2017-03| CATEGORY A|
|02147465400|2017-08-14 00:00:00| 30| 205| 2017-08| CATEGORY A|
|02147465400|2017-09-12 00:00:00| 30| 205| 2017-09| CATEGORY A|
|02147465400|2017-01-21 00:00:00| 30| 205| 2017-01| CATEGORY A|
|02147465400|2018-08-14 00:00:00| 30| 205| 2018-08| CATEGORY A|
|02147465400|2018-08-23 00:00:00| 30| 205| 2018-08| CATEGORY A|
|02147465400|2017-10-11 00:00:00| 30| 205| 2017-10| CATEGORY A|
|02147465400|2017-12-12 00:00:00| 30| 205| 2017-12| CATEGORY A|
|02147465400|2017-02-15 00:00:00| 30| 205| 2017-02| CATEGORY A|
|02147465400|2018-04-12 00:00:00| 30| 205| 2018-04| CATEGORY A|
|02147465400|2018-03-12 00:00:00| 30| 205| 2018-03| CATEGORY A|
|02147465400|2018-05-15 00:00:00| 30| 205| 2018-05| CATEGORY A|
|02147465400|2018-02-12 00:00:00| 30| 205| 2018-02| CATEGORY A|
|02147465400|2018-06-14 00:00:00| 30| 205| 2018-06| CATEGORY A|
|02147465400|2018-01-11 00:00:00| 30| 205| 2018-01| CATEGORY A|
|02147465400|2017-07-20 00:00:00| 30| 205| 2017-07| CATEGORY A|
|02147465400|2017-11-11 00:00:00| 30| 205| 2017-11| CATEGORY A|
|02147465400|2017-01-05 00:00:00| 90| 205| 2017-01| CATEGORY B|
|02147465400|2017-01-21 00:00:00| 90| 205| 2017-01| CATEGORY B|
|02147465400|2017-10-09 00:00:00| 90| 205| 2017-10| CATEGORY B|
|02147465400|2018-07-11 00:00:00| 90| 205| 2018-07| CATEGORY B|
|02147465400|2017-04-16 00:00:00| 90| 205| 2017-04| CATEGORY B|
|02147465400|2018-09-16 00:00:00| 90| 205| 2018-09| CATEGORY B|
|02147465400|2018-04-14 00:00:00| 90| 205| 2018-04| CATEGORY B|
|02147465400|2018-01-12 00:00:00| 90| 205| 2018-01| CATEGORY B|
|02147465400|2017-07-15 00:00:00| 90| 205| 2017-07| CATEGORY B|
+-----------+-------------------+-----------+----------+----------+---------------+
Here is the desired resulting dataframe, one row for the one product, where the rows of the original dataframe have the purchase_date and days_warranty columns cast as an array of tuples into new columns based on the category column value:
+-----------+----------------------------+----------------------------+
| product_id| CATEGORY A| CATEGORY B|
+-----------+----------------------------+----------------------------+
|02147465400| [ (2017-05-16 00:00:00,30),| [ (2017-01-05 00:00:00,90),|
| | (2017-04-15 00:00:00,30),| (2017-01-21 00:00:00,90),|
| | (2018-07-11 00:00:00,30),| (2017-10-09 00:00:00,90),|
| | (2017-06-14 00:00:00,30),| (2018-07-11 00:00:00,90),|
| | (2017-03-16 00:00:00,30),| (2017-04-16 00:00:00,90),|
| | (2017-08-14 00:00:00,30),| (2018-09-16 00:00:00,90),|
| | (2017-09-12 00:00:00,30),| (2018-04-14 00:00:00,90),|
| | (2017-01-21 00:00:00,30),| (2018-01-12 00:00:00,90),|
| | (2018-08-14 00:00:00,30),| (2017-07-15 00:00:00,90) |
| | (2018-08-23 00:00:00,30),| ] |
| | (2017-10-11 00:00:00,30),| |
| | (2017-12-12 00:00:00,30),| |
| | (2017-02-15 00:00:00,30),| |
| | (2018-04-12 00:00:00,30),| |
| | (2018-03-12 00:00:00,30),| |
| | (2018-05-15 00:00:00,30),| |
| | (2018-02-12 00:00:00,30),| |
| | (2018-06-14 00:00:00,30),| |
| | (2018-01-11 00:00:00,30),| |
| | (2017-07-20 00:00:00,30) | |
| | ] |
+-----------+----------------------------+----------------------------+