3
votes

In this previous question, I was trying to avoid memory issues with Spark join by avoiding using join.

In this new question, I'm using join, but trying to fix memory issues with it.

These are my two RDDs:

  1. productToCustomerRDD:
    Size: very large, could have millions of distinct keys
    Partitioned on keys with a HashPartitioner
    Some keys will be highly duplicated and some won't.

    (toast, John)
    (butter, John)
    (toast, Jane)
    (jelly, Jane)
    
  2. productToCountRDD:
    Size: very large, could have millions of distinct keys, too big to broadcast
    Partitioned on keys with a HashPartitioner
    Keys are unique, the value is the number of customers who've purchased the product.

    (toast, 2)
    (butter, 1)
    (jelly, 1)
    

I would like to join these two RDDs, the result will be:

  1. customerToProductAndCountRDD:

    (toast, (John, 2))
    (butter, (John, 1))
    (toast, (Jane, 2))
    (jelly, (Jane, 1))
    

If I join the two RDDs with productToCustomerRDD.join(productToCountRDD) I get an OutOfMemoryError on two partitions (out of thousands). In the Spark UI, I've noticed that during the stage that contains the join, in the Input Size / Records column, all partitions have a number of records from 4K to 700K. All except the two partitions that produced the OOM: one has 9M records and one has 6M records.

As I understand, in order to join, pairs with the same key need to be shuffled and moved to the same partition (unless they were previously partitioned by key). However, since some keys are very frequent (for instance: a product that was purchased by nearly every single customer in the dataset), a huge amount of data may be moved to one partition, either during the join or during the repartition right before the join.

Am I understanding this correctly?
Is there a way to avoid this?
Could there be a way to join without having all the data for one heavily duplicated key on the same partition?

2

2 Answers

3
votes

Actually, this is a standard problem in Spark called "skewed join": one of the sides of the join is skewed, meaning some of its keys are much more frequent that others. Some answers that didn't work out for me can be found here.

The strategy I used is inspired by the GraphFrame.skewedJoin() method defined here and its use in ConnectedComponents.skewedJoin() here. The join will be performed by joining the most frequent keys using a broadcast join and the less frequent keys using a standard join.

In my example (OP) the productToCountRDD already contains the information about key frequency. So it goes like this:

  • Filter productToCountRDD to only keep counts that are above a fixed threshold, and collectAsMap() to the driver.
  • Broadcast this map to all executors.
  • Split productToCustomerRDD into two RDDs: keys that are found in the broadcast map (frequent keys), and keys that aren't (infrequent keys).
  • The join for frequent keys is performed with mapToPair, getting the count from the broadcast map
  • The join for infrequent keys is performed with join.
  • Use union at the end to get your full RDD.
0
votes

my first question is: Do you really need theese detailed data? Do you really need to know that jhon bought 2 toats and so on? We are in big data context and we work with a lot of data so sometimes aggregation is a good thing to reduce cardinality and obtain good results in terms of analysis and performance. So if you want to know how many times a product was sold you can use a pairRDD(product, count) [in this way you will have one element for each product] or if you want to know users preferences you can use a pairRDD(user, list of purchased products) [in this way you will have an element for each user]. If you really need to know that a toast was bought from Jhon why you want to split the toast key in different repartitions? In this way you can't compute a global result because in every chunck you will have only a piece of information about your keys.