3
votes

I have two rdd one rdd have just one column other have two columns to join the two RDD on key's I have add dummy value which is 0 , is there any other efficient way of doing this using join ?

val lines = sc.textFile("ml-100k/u.data")
val movienamesfile = sc.textFile("Cml-100k/u.item")

val moviesid = lines.map(x => x.split("\t")).map(x => (x(1),0))
val test = moviesid.map(x => x._1)
val movienames = movienamesfile.map(x => x.split("\\|")).map(x => (x(0),x(1)))
val shit = movienames.join(moviesid).distinct()

Edit:

Let me convert this question in SQL. Say for example I have table1 (moveid) and table2 (movieid,moviename). In SQL we write something like:

select moviename, movieid, count(1)
from table2 inner join table table1 on table1.movieid=table2.moveid 
group by ....

here in SQL table1 has only one column where as table2 has two columns still the join works, same way in Spark can join on keys from both the RDD's.

1
Your question is not very clear. Can you reformulate?eliasah
Are you trying to achieve filtering, like an inner join?mehmetminanc
yes same as inner join dataset1=123,starwars ; dataset2=123; dataset1.join(datset2) fails because dataset2 is missing one element so I need to add default value in dataset2=123,0; then the join works is there a way if dataset2 contains less number of elements still the join works?sri hari kali charan Tummala
Can you give the types of your declared values?eliasah
I think this post has the answer that you're looking for.Rohan Aletty

1 Answers

8
votes

Join operation is defined only on PairwiseRDDs which are quite different from a relation / table in SQL. Each element of PairwiseRDD is a Tuple2 where the first element is the key and the second is value. Both can contain complex objects as long as key provides a meaningful hashCode

If you want to think about this in a SQL-ish you can consider key as everything that goes to ON clause and value contains selected columns.

SELECT table1.value, table2.value
FROM table1 JOIN table2 ON table1.key = table2.key

While these approaches look similar at first glance and you can express one using another there is one fundamental difference. When you look at the SQL table and you ignore constraints all columns belong in the same class of objects, while key and value in the PairwiseRDD have a clear meaning.

Going back to your problem to use join you need both key and value. Arguably much cleaner than using 0 as a placeholder would be to use null singleton but there is really no way around it.

For small data you can use filter in a similar way to broadcast join:

val moviesidBD = sc.broadcast(
  lines.map(x => x.split("\t")).map(_.head).collect.toSet)

movienames.filter{case (id, _) => moviesidBD.value contains id}

but if you really want SQL-ish joins then you should simply use SparkSQL.

val movieIdsDf = lines
   .map(x => x.split("\t"))
   .map(a => Tuple1(a.head))
   .toDF("id")

val movienamesDf = movienames.toDF("id", "name")

// Add optional join type qualifier 
movienamesDf.join(movieIdsDf, movieIdsDf("id") <=> movienamesDf("id"))