1
votes

I am stuck on a particular scala-spark syntax, and I am hoping you can guide me in the correct direction.

if RDD1 is type Array[((Float, Float, Float), Long)],

RDD1.collect = Array((x1,y1,z1),1), ((x2,y2,z2),2), ((x3,y3,y3),3), ...)

and RDD2 is indices, of type, Array[Long],

RDD2.collect = Array(1, 3, 5...)

What is the best possible way to extract the values from RDD1 whose indices occur in RDD2. i.e, the output, Array((x1,y1,z1),1), ((x3,y3,y3),3),(x5,y5,y5),5) ...)

Both RDD1 and RDD2 are large enough that I would like to avoid using .collect. Otherwise, the problem is simply finding intersecting elements in 2 scala arrays/lists.

thank you so much for your help!

1

1 Answers

1
votes

There is a join function on PairRDD, which is what you want to use here.

// coming in, we have:
// rdd1: RDD[((Float, Float, Float), Long)]
// rdd2: RDD[Long]

val joinReadyRDD1 = rdd1.map { case (values, key) => (key, values) }
val joinReadyRDD2 = rdd1.map { key => (key, ()) }
val joined = joinReadyRDD1.join(joinReadyRDD2).mapValues(_._1)

This returns an RDD[(Long, (Float, Float, Float))] where the Long keys appeared in rdd2.

A side note: If you have a conceptual "key" and "value", put the key first. Take a look at the PairRDDFunctions I linked above -- it's quite a rich API and it all uses RDD[(Key, Value)].