0
votes

I have created 2 RDD's like below:

rdd1 = sc.parallelize([(u'176', u'244', -0.03925566875021147), (u'28', u'244', 0.9175106515709205), (u'165', u'244', -0.3837580218245722), (u'181', u'244', 0.29145693160561503), (u'161', u'244', -0.503468718448459), (u'28', u'275', 1.1636548589189926), (u'165', u'275', -1.026158464467282), (u'181', u'275', 0.6685791983070568)])

rdd2 = sc.parallelize([(u'176', u'244'), (u'28', u'244'), (u'165', u'244'), (u'165', u'275'), (u'181', u'275'), (u'141', u'388'), (u'154', u'238')])

my expected output should be like below:

[(u'176', u'244', -0.03925566875021147,1), (u'28', u'244', 0.9175106515709205,1), (u'165', u'244', -0.3837580218245722,1), (u'181', u'244', 0.29145693160561503,0), (u'161', u'244', -0.503468718448459,0), (u'28', u'275', 1.1636548589189926,0), (u'165', u'275', -1.026158464467282,1), (u'181', u'275', 0.6685791983070568,1)]

i want to join two rdds add join status like 1 or 0.

in rdd1 1st tuple is (u'176', u'244', -0.03925566875021147) and rdd2 contain (u'176', u'244') ,first two elements of rdd1,rdd2 same then my expected output is (u'176', u'244', -0.03925566875021147,1).

same in the case of Rdd1: (u'181', u'275', 0.6685791983070568) and Rdd2 :(u'181', u'275') output will be (u'181', u'275', 0.6685791983070568,1).

else case: rdd1 contain (u'181', u'244', 0.29145693160561503) but rdd2 did not contain any tuple like (u'181', u'244') so expected output will be (u'181', u'244', 0.29145693160561503,0)

I achieved this using creating dataframes ,but I don't want to use data frame join. please help on this how to achieve using rdds.

2

2 Answers

0
votes

To do that in rdd approach you have to paired rdd with columns which you want to join.and then Perform a left outer join of this and other. For each element (k, v) in this, the resulting RDD will either contain all pairs (k, (v, Some(w))) for w in other, or the pair (k, (v, None)) if no elements in other have key k.

 userRDD.leftOuterJoin(empRDD).collect {
        case (String, (firstrddvalue, None)) => (k,v,0)
        case (String, (firstrddvalue,secondrddvalue))=>(k,v,1)
      }
0
votes

I want to join two rdds add join status like 1 or 0

For joining rdd, you would need pairedRdd

pairedRdd1 = rdd1.map(lambda x: ((x[0], x[1]), x[2:]))
pairedRdd2 = rdd2.map(lambda x: ((x[0], x[1]), 1))

Here I have populated 1 in pairedRdd2 as your output requirement is to have 1 for matching rdd2 from rdd1.

Then finally, use leftOuterJoin and some manipulation for the expected output

finalRdd = pairedRdd1.leftOuterJoin(pairedRdd2).map(lambda x: tuple(list(x[0]) + list(x[1][0]) + [0 if(x[1][1] == None) else 1]))
#[('161', '244', -0.503468718448459, 0),('165', '244', -0.3837580218245722, 1),('181', '244', 0.29145693160561503, 0),('165', '275', -1.026158464467282, 1),('181', '275', 0.6685791983070568, 1),('176', '244', -0.03925566875021147, 1),('28', '275', 1.1636548589189926, 0),('28', '244', 0.9175106515709205, 1)]

I hope the answer is helpful