0
votes

I have two RDD of (key,values). My second RDD is shorter than my first RDD. I would like to associate each value of my first RDD to the corresponding value in the second RDD, with respect to the key.

val (rdd1: RDD[(key,A)])
val (rdd2: RDD[(key,B)])
val (rdd3: RDD[R])

with rdd1.count() >> rdd2.count(), and multiple elements of rdd1 have the same key.

Now, I know that I want to use a constant value for b when a corresponding key is not found in rdd2. I thought that leftOuterJoin would be the natural method to use here:

val rdd3 = rdd1.leftOuterJoin(rdd2).map{
case (key,(a,None)) => R(a,c)
case (key,(a,Some(b)) => R(a,b)
}

Anything that may strikes you as wrong here? I am getting unexpected results when joining elements like this.

1
What's unexpected? - Dima
I thought that my output was incorrect but the ordering was just wrong. I was thinking that the ordering of rdd1 would be preserved but it's not the case. Should I delete the question? - NotAbelianGroup
I don't see why not :) - Dima
What is the question? - thebluephantom

1 Answers

2
votes

Not entirely sure what your question is, but here goes:

Approach 1

val rdd1 = sc.parallelize(Array((1, 100), (2,200), (3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))

object Example {
  
  val c = -999
  def myFunc = {
    val enclosedC = c
    val rdd3 = rdd1.leftOuterJoin(rdd2)
    val rdd4 = rdd3.map ( x => x match {
          case (x._1, (x._2._1, None)) => (x._1, (Some(x._2._1), Some(enclosedC)))   
          case _                       => (x._1, (Some(x._2._1), x._2._2  ))
        }).sortByKey()
    //rdd4.foreach(println)
  }
}
Example.myFunc

Approach 2

val rdd1 = sc.parallelize(Array((1, 100), (2,200), (3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))

object Example {
  
  val c = -999
  def myFunc = {
    val enclosedC = c
    val rdd3 = rdd1.leftOuterJoin(rdd2)
    val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1, (Some(x._2._1), Some(enclosedC)) )) else (  (x._1, (Some(x._2._1), x._2._2)) ) }).sortByKey() 
    //rdd4.foreach(println)
  }
}
Example.myFunc

Approach 3

val rdd1 = sc.parallelize(Array((1, 100), (2,200), (3,300) ))
val rdd2 = sc.parallelize(Array((1,100)))

object Example extends Serializable {
  
val c = -999

val rdd3 = rdd1.leftOuterJoin(rdd2)
val rdd4 = rdd3.map(x => { if (x._2._2 == None) ( (x._1, (Some(x._2._1), Some(c)) )) else (  (x._1, (Some(x._2._1), x._2._2)) ) }).sortByKey() 

//rdd4.collect
//rdd4.foreach(println)
}
Example