I'm trying to establish a cohort study to track in-app user behavior and I want ask if you have any idea about how i can specify conditions in pyspark when I use .join() Given :
rdd1 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8',
((u'service1',
u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
u'2016-02-08',
u'2016-39',
u'2016-6',
u'2016-2',
'2016-10-19'),
(u'service2',
u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
u'1',
u'67.0',
u'2016-293',
u'2016-42',
u'2016-10',
'2016-10-19')))])
rdd2 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8',
((u'serice1',
u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
u'2016-02-08',
u'2016-39',
u'2016-6',
u'2016-2',
'2016-10-20'),
(u'service2',
u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
u'10',
u'3346.0',
u'2016-294',
u'2016-42',
u'2016-10',
'2016-10-20')))])
Those two rdds represent informations about a user, with '6df99638e4584a618f92a9cfdf318cf8' as ID, and who have logged on service 1 and service2 on 2016-10-19 and 2016-10-20. My objectif is to join my two rdds, each containing a minimum of 20 000 rows. So it must be an inner join. The real objectif is to get all users who have already logged in on 2016-10-19' and have also logged in on 2016-10-20. So more specifically, my final objectif is to have as result, here for rxemple, after an inner join, just the contents of rdd2.
expected output:
[(u'6df99638e4584a618f92a9cfdf318cf8',
((u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'),
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20'))
) ]
A simple join rdd1.join(rdd2) give me, logically, an RDD containing all pairs of elements with matching the two rdds. A leftOuterJoin or a rightOuterJoin do not suit my land too because I want an inner join (just IDs which are already existing in rdd1 and rdd2)..
Expected output : Supposing that we have two dicts : dict1 = {'a': 'man', 'b': woman, 'c': 'baby'} and dict2 = {'a': 'Zara', 'x': Mango, 'y': 'Celio'}. The expected output must be : output_dict = {'a': 'Zara'}. 'a' (the key) is already existing in dict 1, and what I want is the key, value from dict2!
It tried to do this :
rdd1.map(lambda (k, v) : k).join(rdd2)
This code gives me an empty rdd.
What to do ? PS : I must deal with rdds, not dataframes ! So I don't want to convert my rdds to DataFrames :D Any help appreciated. Thx !