I have two RDD's that I want to join and they look like this:
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
It happens to be the case that the key values of rdd1
are unique and also that the tuple-key values of rdd2
are unique. I'd like to join the two data sets so that I get the following rdd:
val rdd_joined:RDD[((T,W), (U,V))]
What's the most efficient way to achieve this? Here are a few ideas I've thought of.
Option 1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Option 2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
Option 1 will collect all of the data to master, right? So that doesn't seem like a good option if rdd1 is large (it's relatively large in my case, although an order of magnitude smaller than rdd2). Option 2 does an ugly distinct and cartesian product, which also seems very inefficient. Another possibility that crossed my mind (but haven't tried yet) is to do option 1 and broadcast the map, although it would be better to broadcast in a "smart" way so that the keys of the map are co-located with the keys of rdd2
.
Has anyone come across this sort of situation before? I'd be happy to have your thoughts.
Thanks!