1
votes

I'm trying to establish a cohort study to track in-app user behavior and I want ask if you have any idea about how i can specify conditions in pyspark when I use .join() Given :

rdd1 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8',
    ((u'service1',
      u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
      u'2016-02-08',
      u'2016-39',
      u'2016-6',
      u'2016-2',
      '2016-10-19'),
     (u'service2',
      u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
      u'1',
      u'67.0',
      u'2016-293',
      u'2016-42',
      u'2016-10',
      '2016-10-19')))])


rdd2 = sc.parallelize ([(u'6df99638e4584a618f92a9cfdf318cf8',
    ((u'serice1',
      u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
      u'2016-02-08',
      u'2016-39',
      u'2016-6',
      u'2016-2',
      '2016-10-20'),
     (u'service2',
      u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A',
      u'10',
      u'3346.0',
      u'2016-294',
      u'2016-42',
      u'2016-10',
      '2016-10-20')))])

Those two rdds represent informations about a user, with '6df99638e4584a618f92a9cfdf318cf8' as ID, and who have logged on service 1 and service2 on 2016-10-19 and 2016-10-20. My objectif is to join my two rdds, each containing a minimum of 20 000 rows. So it must be an inner join. The real objectif is to get all users who have already logged in on 2016-10-19' and have also logged in on 2016-10-20. So more specifically, my final objectif is to have as result, here for rxemple, after an inner join, just the contents of rdd2.

expected output:

    [(u'6df99638e4584a618f92a9cfdf318cf8',
((u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20'))
) ] 

A simple join rdd1.join(rdd2) give me, logically, an RDD containing all pairs of elements with matching the two rdds. A leftOuterJoin or a rightOuterJoin do not suit my land too because I want an inner join (just IDs which are already existing in rdd1 and rdd2)..

Expected output : Supposing that we have two dicts : dict1 = {'a': 'man', 'b': woman, 'c': 'baby'} and dict2 = {'a': 'Zara', 'x': Mango, 'y': 'Celio'}. The expected output must be : output_dict = {'a': 'Zara'}. 'a' (the key) is already existing in dict 1, and what I want is the key, value from dict2!

It tried to do this :

rdd1.map(lambda (k, v) : k).join(rdd2)

This code gives me an empty rdd.

enter image description here

What to do ? PS : I must deal with rdds, not dataframes ! So I don't want to convert my rdds to DataFrames :D Any help appreciated. Thx !

1
what is the expected output?Yaron
@Yaron: [(u'6df99638e4584a618f92a9cfdf318cf8', ((u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), (u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20'))) ]Arij SEDIRI
@Yaron : The contents of rdd2. I'm looking for users wich are existing in rdd1 (2016-10-19) and rdd2 (2016-10-20).Arij SEDIRI
I'm sorry, can you please explain, in a simple way, what is the algorithm which should be executed on the input, and what is the expected output?Yaron
expected output : All rows of rdd2 wich are existing in rdd1. Supposing that we have two dicts : dict1 = {'a': 'man', 'b': woman, 'c': 'baby'} and dict2 = {'a': 'Zara', 'x': Mango, 'y': 'Celio'}. The expected output must be : output_dict = {'a': 'Zara'}. 'a' (the key) is already existing in dict 1, and what I want is the key, value from dict2! Thx !Arij SEDIRI

1 Answers

2
votes

So, you are looking for a join of rdd1 and rdd2, which will take key and value from rdd2 only:

rdd_output = rdd1.join(rdd2).map(lambda (k,(v1,v2)):(k,v2))

The result is:

print rdd_output.take(1)

[(u'6df99638e4584a618f92a9cfdf318cf8', (
(u'serice1', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'2016-02-08', u'2016-39', u'2016-6', u'2016-2', '2016-10-20'), 
(u'service2', u'D8B75AA2-7408-49A7-A70D-6442C12E2B6A', u'10', u'3346.0', u'2016-294', u'2016-42', u'2016-10', '2016-10-20')
))]