2
votes

I have two RDDS :

rdd1 [String,String,String]: Name, Address, Zipcode
rdd2 [String,String,String]: Name, Address, Landmark 

I am trying to join these 2 RDDs using the function : rdd1.join(rdd2)
But I am getting an error :
error: value fullOuterJoin is not a member of org.apache.spark.rdd.RDD[String]

The join should join the RDD[String] and the output RDD should be something like :

rddOutput : Name,Address,Zipcode,Landmark

And I wanted to save these files as a JSON file in the end.

Can someone help me with the same ?

1
join is defined on pair RDD, so your rdd1 is not of type RDD[(String, T)] . You should map it, like this rdd1.map(v => (v, 1)) (or to another tuple, it depends on your task). If you explain your goal in more details (what you expect to get from the join), you may get more help. - Vitalii Kotliarenko
@VitaliyKotlyarenko : Sorry for not clarifying it earlier. I just edited the question. Can you please help me with that ? - user2122466
Your edit doesnt' help much. You don't have a RDD[String], but two RDD[String, String, String]. Which field(s) do you want to join on? Name and Address, or just one of those? You need to change the RDDs to have entries that are tuples where the first of the pair is the key, and the rest is the value, then join will work. - The Archetypal Paul
I want to join on Name and Address both. - user2122466

1 Answers

7
votes

As said in the comments, you have to convert your RDDs to PairRDDs before joining, which means that each RDD must be of type RDD[(key, value)]. Only then you can perform the join by the key. In your case, the key is composed by (Name, Address), so you you would have to do something like:

// First, we create the first PairRDD, with (name, address) as key and zipcode as value:
val pairRDD1 = rdd1.map { case (name, address, zipcode) => ((name, address), zipcode) }
// Then, we create the second PairRDD, with (name, address) as key and landmark as value:
val pairRDD2 = rdd2.map { case (name, address, landmark) => ((name, address), landmark) }

// Now we can join them. 
// The result will be an RDD of ((name, address), (zipcode, landmark)), so we can map to the desired format:
val joined = pairRDD1.fullOuterJoin(pairRDD2).map { 
  case ((name, address), (zipcode, landmark)) => (name, address, zipcode, landmark) 
}

More info about PairRDD functions in the Spark's Scala API documentation