4
votes

I have 4rdds of type RDD:((int,int,int),values) and my rdds are

rdd1: ((a,b,c), value) 
rdd2:((a,d,e),valueA) 
rdd3:((f,b,g),valueB)
rdd4:((h,i,c),valueC) 

How can i join the rdds like rdd1 join rdd2 on "a" rdd1 join rdd2 on "b" and rdd1 join rdd3 on "c"

so the output is finalRdd: ((a,b,c),valueA,valueB,valueC,value)) in Scala ?

I tried doing this with collectAsMap but it didnt work well and throws exception

code just for rdd1 join rdd2

val newrdd2=rdd2.map{case( (a,b,c),d)=>(a,d)}.collectAsMap
val joined=rdd1.map{case( (a,b,c),d)=>(newrdd2.get(a).get,b,c,d)} 

example

rdd1: ((1,2,3),animals)
rdd2:((1,anyInt,anyInt),cat)
rdd3:((anyInt,2,anyInt),cow )
rdd 4: ((anyInt,anyInt,3),parrot)

the output should be ((1,2,3),animals,cat,cow,parrot )

1
Could you please write a better example, of the data inside the rdds?Alberto Bonsanto
i added an example ,it doesnt matter what number is on anyInt fieldluis
can there be duplicate rows in rdd1? duplicate keys? (e.g. two elements with (1,2,3) and value "animal" and "another-animal")The Archetypal Paul

1 Answers

3
votes

There is a handy join method on RDDs, but you need it to be keyed by you particular join key, which is what Spark uses for partitioning and shuffling.

From the docs:

join(otherDataset, [numTasks]) : When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

I can't compile where I am, but by hand it goes something like this:

val rdd1KeyA = rdd1.map(x => (x._1._1, (x._1._2, x._1._3. x._2) // RDD(a, (b,c,value))
val rdd2KeyA = rdd2.map(x => (x._1._1, x._2) // RDD(a, valueA)
val joined1 = rdd1KeyA.join(rdd2KeyA) // RDD(a, ((b,c,value), valueA))

val rdd3KeyB = rdd3.map(x => (x._1._2, x._2) // RDD(b, valueB)
val joined1KeyB = joined1.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._1._3. x._2._2) // RDD(b, (a, c, value, valueA))
val joined2 = joined1KeyB.join(rdd3keyB) // RDD(b, ((a, c, value, valueA), valueB))

...and so on

Avoid collect* functions since they do not use the distributed nature of your data and are prone to fail on big loads, they shuffle all data on an RDD to an in-memory collection on the master node, probably blowing everything up.