In this previous question, I was trying to avoid memory issues with Spark join by avoiding using join.
In this new question, I'm using join, but trying to fix memory issues with it.
These are my two RDDs:
productToCustomerRDD:
Size: very large, could have millions of distinct keys
Partitioned on keys with aHashPartitioner
Some keys will be highly duplicated and some won't.(toast, John) (butter, John) (toast, Jane) (jelly, Jane)productToCountRDD:
Size: very large, could have millions of distinct keys, too big tobroadcast
Partitioned on keys with aHashPartitioner
Keys are unique, the value is the number of customers who've purchased the product.(toast, 2) (butter, 1) (jelly, 1)
I would like to join these two RDDs, the result will be:
customerToProductAndCountRDD:
(toast, (John, 2)) (butter, (John, 1)) (toast, (Jane, 2)) (jelly, (Jane, 1))
If I join the two RDDs with productToCustomerRDD.join(productToCountRDD) I get an OutOfMemoryError on two partitions (out of thousands). In the Spark UI, I've noticed that during the stage that contains the join, in the Input Size / Records column, all partitions have a number of records from 4K to 700K. All except the two partitions that produced the OOM: one has 9M records and one has 6M records.
As I understand, in order to join, pairs with the same key need to be shuffled and moved to the same partition (unless they were previously partitioned by key). However, since some keys are very frequent (for instance: a product that was purchased by nearly every single customer in the dataset), a huge amount of data may be moved to one partition, either during the join or during the repartition right before the join.
Am I understanding this correctly?
Is there a way to avoid this?
Could there be a way to join without having all the data for one heavily duplicated key on the same partition?