2
votes

I have two RDDs :

RDD1[String, Double]

sample data :

("a" , 1.0)
("b" , 2.0)
("c" , 3.0)
("d" , 4.0)

This corresponds to a key value pair.


RDD2[String , (String , String)

sample data :

("a" , ("b" , "c"))
("b" , ("a" , "b"))
("c" , ("a" , "d"))
("d" , ("a" , "b"))

RDD1 contains values that is required by RDD2

So I want to be able to access values from RDD2 in RDD1 such as :

("a" , ("b" , "c")) will map to ("a" , (2.0 , 3.0))

2.0 & 3.0 are the corresponding values in RDD1

How can I achieve this with Scala Spark ? Possible solutions is to convert RDD1 to a HashMap and then just "get" the values within a Map operation of RDD2 :

RDD2.map(m => RDD1HashMap.get(m._2._1))

Is there an alternative method to achieve this ?

1
it depends on how large RDD1 is, if it's small it should be a hash map , if not you can probably achieve this with a joinaaronman

1 Answers

3
votes

If RDD1 is small you should definitely have it in a hash map that you use as a broadcast variable (wild guess anything in the low 10's of millions should be fine). If not you have two options.

  1. use the PairRDDFunction lookup, this may be extremely inefficient/illegal(although it worked fine locally).

    RDD1.cache() RDD2.map(m => RDD1.lookup(m._2._1))

  2. The second option is somewhat more complex, you have to do two joins (spark still doesn't have support for joining more than 2 datasets at a time)

    val joinedDataSet = RDD2.map((k,v)=> (v._1,(k,v._2))). join(RDD1).map((k,v)=>(v._1._2,(v._2,v._1._1))). join(RDD2).map((k,v)=>(v._1._2(v._1._1,v._2)))

That should be the dataset you wanted, I realize the RDD is extremely messy, you may want to use case classes and or do the two joins seperately then join those RDD's together to make it clearer(if slightly less efficient). Also noticed that for some reason scala can't perform the type inference on the lambdas, I think I would try one of the other 2 options before resorting to this.