0
votes

I have two RDD[Array[String]], let's call them rdd1 and rdd2. I would create a new RDD containing just the entries of rdd2 not in rdd1 (based on a key). I use Spark on Scala via Intellij.

I grouped rdd1 and rdd2 by a key (I will compare just the keys of the two rdds):

val rdd1Grouped = rdd1.groupBy(line => line(0))
val rdd2Grouped = rdd2.groupBy(line => line(0))

Then, I used a leftOuterJoin:

val output = rdd1Grouped.leftOuterJoin(rdd2Grouped).collect {
  case (k, (v, None)) => (k, v)
}

but this doesn't seem to give the correct result.

What's wrong with it? Any suggests?

Example of RDDS (every line is an Array[String], ofc):

rdd1                        rdd2                  output (in some form)

1,18/6/2016               2,9/6/2016                  2,9/6/2016
1,18/6/2016               2,9/6/2016 
1,18/6/2016               2,9/6/2016
1,18/6/2016               2,9/6/2016
1,18/6/2016               1,20/6/2016
3,18/6/2016               1,20/6/2016 
3,18/6/2016               1,20/6/2016
3,18/6/2016
3,18/6/2016
3,18/6/2016

In this case I wanna add just the entry "2,9/6/2016" because the key "2" is not in rdd1.

2

2 Answers

1
votes

new RDD containing just the entries of rdd2 not in rdd1

left join would retain all keys in rdd1 and append columns of RDD2 matching key values. So clearly left join/outer join is not the solution.

rdd1Grouped.subtractByKey(rdd2Grouped) would be apt in your case.

P.S. : Also note that if rdd1 is smaller better broadcast it. In that way, only second rdd would be streamed at the time of subtract.

1
votes

Switch rdd1Grouped and rdd2Grouped, and then use filter:

val output = rdd2Grouped.leftOuterJoin(rdd1Grouped).filter( line => {
  line._2._2.isEmpty
}).collect