11
votes

I am learning Python and Hadoop. I completed the setup and basic examples provided in official site using pythong+hadoop streaming. I considered implementing join of 2 files. I completed equi-join which checks if same key appears in both input files, then it outputs the key along with values from file 1 and file 2 in that order. The equality join is working as it is supposed.

Now, I wish to do inequality join which involves finding Cross Product before applying the inequality condition. I am using the same mapper (do I need to change it) and I changed the reducer so that it contains a nested loop (since every key-value pair in file1 must be matched with all key-values pairs in file2). This doesn't work since you can only go through the stream once. Now, I thought of an option of storing 'some' values in reducer and comparing them but I have no idea 'how' many. Naive method is to store whole file2 content in a array (or similar structure) but thats stupid and goes against the idea of distributed processing. Finally, my questions are

  1. How can I store values in reducer so that I can have cross product between two files?

  2. In equi-join, Hadoop seems to be sending all key value pairs with same key to same reducer which is perfectly fine and works well for that case. However, how I do change this behaviour (if needed) so that required grouping of key-value pairs go correct reducer?

Sample Files: http://pastebin.com/ufYydiPu

Python Map/Reduce Scripts: http://pastebin.com/kEJwd2u1

Hadoop Command I am using:

bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /home/hduser/mapper.py -mapper mapper.py -file /home/hduser/ireducer.py -reducer reducer.py -input /user/hduser/inputfiles/* -output /user/hduser/join-output

Any help/hint is much appreciated.

1

1 Answers

3
votes

One way to deal with the multiple combinations which can be very helpful to avoid the nested loops is to use the itertools module. Specifically the itertools.product function which takes care of the cartesian product using generators. This is good for memory usage, efficiency and it can simplify your code significantly if you have to join multiple data sets in one map reduce job.

Regarding the correspondence between the data yielded by the mapper and the data sets to be combined in the reducer, if the data sets for each key are not too big, you can simply yield from the mapper a combination like:

{key, [origin_1, values]}
{key, [origin_2, values]}

Thus, you will be able to group the values with same origin in the reducer into dictionaries which will be the data sets over which the cartesian product will be applied using itertools.product.