0
votes

I am trying the understand the Hadoop word count example in Python http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

The author starts with naive versions of the mapper and the reducer. Here is the reducer (I removed some comments for brevity)

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    line = line.strip()

    word, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

The author tests the program with:

echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py

So the reducer is written as if a reducer job's input data was like:

aa 1
aa 1
bb 1
cc 1
cc 1
cc 1

My initial understand of a reducer was that the input data for a given reducer would contain one unique key. So in the previous examples, 3 reducers jobs would be needed. Is my understand incorrect?

Then the author presents improved versions of the mapper and the reducer. Here is the reducer:

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)

    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

The author adds the following warning:

Note: The following Map and Reduce scripts will only work “correctly” when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test command “cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py” will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.

I don't understand why the naive test command doesn't work with the new version. I thought the use of sort -k1,1 would produce the same input for both versions of the reducer. What am I missing?

1

1 Answers

0
votes

Regarding your first question: "My initial understand of a reducer was that the input data for a given reducer would contain one unique key. So in the previous examples, 3 reducers jobs would be needed. Is my understand incorrect?"

There is a difference between the MapReduce abstraction and the Hadoop's implementation of that abstraction. In the abstraction, a reducer is associated with a unique key. On the other hand, the Hadoop implementation assigns several keys to the same reducer (to avoid the cost of closing a process and starting out a new process). In particular, in Hadoop streaming a reducer receives the key-value pairs corresponding to a certain number of keys (it could be zero, one, or more keys) but you have the guarantee that the key-value pairs associated with a certain key will be coming all consecutively to each other.

For instance, let's take your word count example with input "foo foo quux labs foo bar quux". Then it could be that a reducer receives the input "bar 1\nfoo 1\nfoo 1\nfoo1" and another reducer receives "labs 1\nquux 1\nquux 1". The number of actual reducer processes that run is decided by you using the option mapred.reduce.tasks. For example to use 2 reducers you can do

 $ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=2 -mapper ....

Regarding your second question, I agree with you that sort -k1,1 will do the trick, so I don't see the problem either.