1
votes

I have two case classes and one RDD of each.

case class Thing1(Id: String, a: String, b: String, c: java.util.Date, d: Double)
case class Thing2(Id: String, e:  java.util.Date, f: Double)

val rdd1 = // Loads an rdd of type RDD[Thing1]
val rdd2 = // Loads an rdd of type RDD[Thing2]

I want to create 2 new RDD[Thing1]s, 1 that contains elements of rdd1 where the element has an Id present in rdd2, and another that contains elements of rdd1 where the element does not have an Id present in rdd2

Here's what I have tried (looked at this, Scala Spark contains vs. does not contain, and other stack overflow posts, but none have worked)

val rdd2_ids = rdd2.map(r => r.Id)
val rdd1_present = rdd1.filter{case r => rdd2 contains r.Id}

val rdd1_absent = rdd1.filter{case r => !(rdd2 contains r.Id)}

But this gets me the error error: value contains is not a member of org.apache.spark.rdd.RDD[String] I have seen many questions on SO asking how to do similar things to what I am trying to do, but none have worked for me. I get the value _____ is not a member of org.apache.spark.rdd.RDD[String] error a lot.

Why are these other answers not working for me, and how can I achieve what I am trying to do?

2
Why can't you use dataframe instead of RDD? Dataframe provides except function which finds the diff between 2 dataframes.Avishek Bhattacharya

2 Answers

0
votes

I created two simple RDDs

val rdd1 = sc.parallelize(Array(
     | Thing1(1,2),
     | Thing1(2,3),
     | Thing1(3,4) ))
rdd1: org.apache.spark.rdd.RDD[Thing1] = ParallelCollectionRDD[174] at parallelize 

 val rdd2 = sc.parallelize(Array(
     | Thing2(1, "Two"),
     | Thing2(2, "Three" ) ))
rdd2: org.apache.spark.rdd.RDD[Thing2] = ParallelCollectionRDD[175] at parallelize 

Now you can join them by the respective element for which you want to find the common value in both :

val rdd1_present = rdd1.keyBy(_.a).join(rdd2.keyBy(_.a) ).map{ case(a, (t1, t2) ) => t1 }

//rdd1_present.collect
//Array[Thing1] = Array(Thing1(2,3), Thing1(1,2))

val rdd1_absent = rdd1.keyBy(_.a).subtractByKey(rdd1_present.keyBy(_.a) ).map{ case(a,t1) => t1 }

//rdd1_absent.collect
//Array[Thing1] = Array(Thing1(3,4))
0
votes

Try full outer join-

val joined = rdd1.map(s=>(s.id,s)).fullOuterJoin(rdd2.map(s=>(s.id,s))).cache()

//only in left 
joined.filter(s=> s._2._2.isEmpty).foreach(println)

//only in right
joined.filter(s=>s._2._1.isEmpty).foreach(println)

//in both
joined.filter(s=> !s._2._1.isEmpty && !s._2._2.isEmpty).foreach(println)