0
votes

i have 2 RDDS in below format

RDD1     178,1
         156,1
          23,2
RDD2     
         34
         178
         156

now i want to filter rdd1 on the basis of value in rdd2 ie if 178 is present in rdd1 and also in rdd2 then it should return me those tuples from rdd1.

i have tried

val out = reversedl1.filter({ case(x,y) => x.contains(lines)}) 

where lines is my 2nd rdd and reversedl1 is first, but its not working

i also tried

val abce = reversedl1.subtractByKey(lines)
val defg = reversedl1.subtractByKey(abce)

This is also not working.

Any suggestions?

1
If RDD2 is small then you can also collect it to an Array and then run RDD1.filter(x => arr.contains(x._1) ) where arr = RDD2.collect - philantrovert

1 Answers

0
votes

You can convert rdd2 to key value pairs and then join with rdd1 on the keys:

val rdd1 = sc.parallelize(Seq((178, 1), (156, 1), (23, 2)))
val rdd2 = sc.parallelize(Seq(34, 178, 156))

(rdd1.join(rdd2.distinct().map(k => (k, null)))           
// here create a dummy value to form a pair wise RDD so you can join
     .map{ case (k, (v1, v2)) => (k, v1) }         // drop the dummy value   
 ).collect
// res11: Array[(Int, Int)] = Array((156,1), (178,1))