1
votes

I have 2 paired RDDs like below

RDD1 contains name as key and zipcode as value:

RDD1 -> RDD( (ashley, 20171), (yash, 33613), (evan, 40217) )

RDD2 contains zip code as key and some random number as value:

RDD2 -> RDD( (20171, 235523), (33613, 345345345), (40189, 44355217), (40122, 2345235), (40127, 232323424) )

I need to replace the zipcodes in RDD1 with the corresponding values from RDD2. So the output would be

RDD3 -> RDD( (ashley, 235523), (yash, 345345345), (evan, 232323424) )

I tried doing it using the RDD lookup method like below but I got exception saying that RDD transformations cannot be perfomed inside another RDD transformation

val rdd3 = rdd1.map( x => (x._1, rdd2.lookup(x._2)(0)) )
1
You can't do that. If rdd2 is small, you could collect it on the driver and broadcast it, then what you're trying would be possible. Otherwise, you'll probably have to play with joins to achieve what you want.vanza
Do you mean something like this? val zipmap = Map("40217" -> "Alabama", "40222" -> "Alaska", "20127" -> "miami", "33613" -> "herndon", "40111" -> "tampa") val broadcastVar = sc.broadcast(zipmap) val user_zip_lookup = user_zip_pair.map( x => (x._1, broadcastVar.value(x._2)) )yAsH
Yes, that's the gist of it if you can broadcast the data.vanza

1 Answers

6
votes

Yon can simply join 2 RDDs by zipcode:

rdd1.map({case (name, zipcode) => (zipcode, name)})
    .join(rdd2)
    .map({case (zipcode, (name, number)) => (name, number)})
    .collect()

Note, this will return only records, that have matching zipcodes in rdd1 and rdd2. If you want to set some default number to records in rdd1, that doesn't have corresponding zipcode in rdd2, use leftOuterJoin insted of join:

rdd1.map({case (name, zipcode) => (zipcode, name)})
    .leftOuterJoin(rdd2)
    .map({case (zipcode, (name, number)) => (name, number.getOrElse(0))})
    .collect()