0
votes

I have a problem with joining two Dataframes with columns containing Arrays in PySpark. I want to join on those columns if the elements in the arrays are the same (order does not matter).

So, I have one DataFrame containing itemsets and their frequencies in the following format:

+--------------------+----+
|               items|freq|
+--------------------+----+
|  [1828545, 1242385]|   4|
|  [1828545, 2032007]|   4|
|           [1137808]|  11|
|           [1209448]|   5|
|             [21002]|   5|
|           [2793224]| 209|
|     [2793224, 8590]|   7|
|[2793224, 8590, 8...|   4|
|[2793224, 8590, 8...|   4|
|[2793224, 8590, 8...|   5|
|[2793224, 8590, 1...|   4|
|  [2793224, 2593971]|  20|
+--------------------+----+

And another DataFrame which contains information about the user and items in the following format:

+------------+-------------+--------------------+
|     user_id|   session_id| itemset            |
+------------+-------------+--------------------+
|WLB2T1JWGTHH|0012c5936056e|[1828545, 1242385]  |
|BZTAWYQ70C7N|00783934ea027|[2793224, 8590]     | 
|42L1RJL436ST|00c6821ed171e|[8590, 2793224]     |
|HB348HWSJAOP|00fa9607ead50|[21002]             |
|I9FOENUQL1F1|013f69b45bb58|[21002]             |  
+------------+-------------+--------------------+

Now I want to join those two dataframes on itemset and items if the elements are the same in the array (it does not matter how they are ordered). My desired output would be:

+------------+-------------+--------------------+----+
|     user_id|   session_id| itemset            |freq|
+------------+-------------+--------------------+----+
|WLB2T1JWGTHH|0012c5936056e|[1828545, 1242385]  |   4|
|BZTAWYQ70C7N|00783934ea027|[2793224, 8590]     |   7|
|42L1RJL436ST|00c6821ed171e|[8590, 2793224]     |   7|
|HB348HWSJAOP|00fa9607ead50|[21002]             |   5|
|I9FOENUQL1F1|013f69b45bb58|[21002]            |   5|  
+------------+-------------+--------------------+----+

I could not find any solution online, only solutions where dataframes are joined where one item is contained in an array.

Thank you very much! :)

1

1 Answers

1
votes

The spark implementation of join can handle array columns without an issue. The only problem is, that it doesn't ignore the order of your column. Therefore it is required to sort the join column before to join correctly. You can use the sort_array function for that.

from pyspark.sql import functions as F

df1 = spark.createDataFrame(
[
(  [1828545, 1242385],   4),
(  [1828545, 2032007],   4),
(           [1137808],  11),
(           [1209448],   5),
(             [21002],   5),
(           [2793224], 209),
(     [2793224, 8590],   7),
([2793224, 8590, 81],   4),
([2793224, 8590, 82],   4),
([2793224, 8590, 83],   5),
([2793224, 8590, 11],   4),
(  [2793224, 2593971],  20)
], ['items','freq'])


df2 = spark.createDataFrame(
[
('WLB2T1JWGTHH','0012c5936056e',[1828545, 1242385]  ),
('BZTAWYQ70C7N','00783934ea027',[2793224, 8590]     ), 
('42L1RJL436ST','00c6821ed171e',[8590, 2793224]     ),
('HB348HWSJAOP','00fa9607ead50',[21002]             ),
('I9FOENUQL1F1','013f69b45bb58',[21002]             ) 
], ['user_id',   'session_id', 'itemset'])

df1 = df1.withColumn('items', F.sort_array('items'))
df2 = df2.withColumnRenamed('itemset', 'items').withColumn('items', F.sort_array('items'))

df1.join(df2, "items").show()

Output:

+------------------+----+------------+-------------+ 
|             items|freq|     user_id|   session_id| 
+------------------+----+------------+-------------+ 
|   [8590, 2793224]|   7|BZTAWYQ70C7N|00783934ea027| 
|   [8590, 2793224]|   7|42L1RJL436ST|00c6821ed171e| 
|[1242385, 1828545]|   4|WLB2T1JWGTHH|0012c5936056e| 
|           [21002]|   5|HB348HWSJAOP|00fa9607ead50| 
|           [21002]|   5|I9FOENUQL1F1|013f69b45bb58| 
+------------------+----+------------+-------------+