6
votes

I have two rdds that I need to join them together. They look like the followings:

RDD1

[(u'2', u'100', 2),
 (u'1', u'300', 1),
 (u'1', u'200', 1)]

RDD2

[(u'1', u'2'), (u'1', u'3')]

My desired output is:

[(u'1', u'2', u'100', 2)]

So I would like to select those from RDD2 that have the same second value of RDD1. I have tried join and also cartesian and none is working and not getting even close to what I am looking for. I am new to Spark and would appreciate any help from you guys.

Thanks

2
Do you allow to use Spark Dataframe for this solution?titipata

2 Answers

7
votes

Dataframe If you allow using Spark Dataframe in the solution. You can turn given RDD to dataframes and join the corresponding column together.

df1 = spark.createDataFrame(rdd1, schema=['a', 'b', 'c'])
df2 = spark.createDataFrame(rdd2, schema=['d', 'a'])
rdd_join = df1.join(df2, on='a')
out = rdd_join.rdd.collect()

RDD just zip the key that you want to join to the first element and simply use join to do the joining

rdd1_zip = rdd1.map(lambda x: (x[0], (x[1], x[2])))
rdd2_zip = rdd2.map(lambda x: (x[1], x[0]))
rdd_join = rdd1_zip.join(rdd2_zip)
rdd_out = rdd_join.map(lambda x: (x[0], x[1][0][0], x[1][0][1], x[1][1])).collect() # flatten the rdd
print(rdd_out)
6
votes

For me your process looks like manual. Here is sample code:-

rdd = sc.parallelize([(u'2', u'100', 2),(u'1', u'300', 1),(u'1', u'200', 1)])
rdd1 = sc.parallelize([(u'1', u'2'), (u'1', u'3')])
newRdd = rdd1.map(lambda x:(x[1],x[0])).join(rdd.map(lambda x:(x[0],(x[1],x[2]))))
newRdd.map(lambda x:(x[1][0], x[0], x[1][1][0], x[1][1][1])).coalesce(1).collect()

OUTPUT:-

[(u'1', u'2', u'100', 2)]