0
votes

Let's say, I have two pyspark dataframes, users and shops. A few sample rows for both the dataframes are shown below.

users dataframe:

+---------+-------------+---------+
| idvalue | day-of-week | geohash |
+---------+-------------+---------+
| id-1    |           2 | gcutjjn |
| id-1    |           3 | gcutjjn |
| id-1    |           5 | gcutjht |
+---------+-------------+---------+

shops dataframe

+---------+-----------+---------+
| shop-id | shop-name | geohash |
+---------+-----------+---------+
| sid-1   | kfc       | gcutjjn |
| sid-2   | mcd       | gcutjhq |
| sid-3   | starbucks | gcutjht |
+---------+-----------+---------+

I need to join both of these dataframes on the geohash column. I can do a naive equi-join for sure, but the users dataframe is huge, containing billions of rows, and geohashes are likely to repeat, within and across idvalues. So, I was wondering if there's a way to perform joins on unique geohashes in the users dataframe and geohashes in the shops dataframe. If we can do that, then it's easy to replicate the shops entries for matching geohashes in resultant dataframe.

Probably it can be achieved with a pandas udf, where I would perform a groupby on users.idvalue, do a join with shops within the udf by only taking the first row from the group (because all ids are same anyway within the group), and creating a one row dataframe. Logically it feels like this should work, but not sure sure on the performance aspect as udf(s) are usually slower than spark native transformations. Any ideas are welcome.

2
The sample users df you have shown has duplicate rows. If that's the case in with your actual data, then you can drop the duplicate rows in the users df and then perform a joinClock Slave
Actually, only the geohash column has duplicate entries, rest are all different, so I can't drop the rows. I've updated the table for clearing confusion.Bitswazsky
So you have to keep all rows in users df and your shops data doesn't have duplicate geohashes. The way to perform the join is a join. The problem has already been reduced to its basic form.Clock Slave

2 Answers

1
votes

You said that your Users dataframe is huge and that "geohashes are likely to repeat, within and across idvalues". You didn't referred however if there might be duplicated geohashes in your shops dataframe.

If there are no repeated hashes in the latter, I think that a simple join would solve your problem:

val userDf = Seq(("id-1",2,"gcutjjn"),("id-2",2,"gcutjjn"),("id-1",3,"gcutjjn"),("id-1",5,"gcutjht")).toDF("idvalue","day_of_week","geohash")
val shopDf = Seq(("sid-1","kfc","gcutjjn"),("sid-2","mcd","gcutjhq"),("sid-3","starbucks","gcutjht")).toDF("shop_id","shop_name","geohash")

userDf.show
+-------+-----------+-------+
|idvalue|day_of_week|geohash|
+-------+-----------+-------+
|   id-1|          2|gcutjjn|
|   id-2|          2|gcutjjn|
|   id-1|          3|gcutjjn|
|   id-1|          5|gcutjht|
+-------+-----------+-------+

shopDf.show
+-------+---------+-------+
|shop_id|shop_name|geohash|
+-------+---------+-------+
|  sid-1|      kfc|gcutjjn|
|  sid-2|      mcd|gcutjhq|
|  sid-3|starbucks|gcutjht|
+-------+---------+-------+

shopDf
    .join(userDf,Seq("geohash"),"inner")
    .groupBy($"geohash",$"shop_id",$"idvalue")
    .agg(collect_list($"day_of_week").alias("days"))
    .show
+-------+-------+-------+------+
|geohash|shop_id|idvalue|  days|
+-------+-------+-------+------+
|gcutjjn|  sid-1|   id-1|[2, 3]|
|gcutjht|  sid-3|   id-1|   [5]|
|gcutjjn|  sid-1|   id-2|   [2]|
+-------+-------+-------+------+

If you have repeated hash values in your shops dataframe, a possible approach would be to remove those repeated hashes from your shops dataframe (if your requirements allow this), and then perform the same join operation.

val userDf = Seq(("id-1",2,"gcutjjn"),("id-2",2,"gcutjjn"),("id-1",3,"gcutjjn"),("id-1",5,"gcutjht")).toDF("idvalue","day_of_week","geohash")
val shopDf = Seq(("sid-1","kfc","gcutjjn"),("sid-2","mcd","gcutjhq"),("sid-3","starbucks","gcutjht"),("sid-4","burguer king","gcutjjn")).toDF("shop_id","shop_name","geohash")

userDf.show
+-------+-----------+-------+
|idvalue|day_of_week|geohash|
+-------+-----------+-------+
|   id-1|          2|gcutjjn|
|   id-2|          2|gcutjjn|
|   id-1|          3|gcutjjn|
|   id-1|          5|gcutjht|
+-------+-----------+-------+

shopDf.show
+-------+------------+-------+
|shop_id|   shop_name|geohash|
+-------+------------+-------+
|  sid-1|         kfc|gcutjjn|  <<  Duplicated geohash
|  sid-2|         mcd|gcutjhq|
|  sid-3|   starbucks|gcutjht|
|  sid-4|burguer king|gcutjjn|  <<  Duplicated geohash
+-------+------------+-------+

//Dataframe with hashes to exclude:
val excludedHashes = shopDf.groupBy("geohash").count.filter("count > 1")
excludedHashes.show
+-------+-----+
|geohash|count|
+-------+-----+
|gcutjjn|    2|
+-------+-----+

//Create a dataframe of shops without the ones with duplicated hashes
val cleanShopDf = shopDf.join(excludedHashes,Seq("geohash"),"left_anti")
cleanShopDf.show
+-------+-------+---------+
|geohash|shop_id|shop_name|
+-------+-------+---------+
|gcutjhq|  sid-2|      mcd|
|gcutjht|  sid-3|starbucks|
+-------+-------+---------+

//Perform the same join operation
cleanShopDf.join(userDf,Seq("geohash"),"inner")
    .groupBy($"geohash",$"shop_id",$"idvalue")
    .agg(collect_list($"day_of_week").alias("days"))
    .show
+-------+-------+-------+----+
|geohash|shop_id|idvalue|days|
+-------+-------+-------+----+
|gcutjht|  sid-3|   id-1| [5]|
+-------+-------+-------+----+

The code provided was written in Scala but it can be easily converted to Python.

Hope this helps!

0
votes

This is an idea if it possible you used pyspark SQL to select distinct geohash and create to the tempory table. Then join from this table instead of dataframes.