I am trying the understand the Hadoop word count example in Python http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
The author starts with naive versions of the mapper and the reducer. Here is the reducer (I removed some comments for brevity)
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
The author tests the program with:
echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
So the reducer is written as if a reducer job's input data was like:
aa 1
aa 1
bb 1
cc 1
cc 1
cc 1
My initial understand of a reducer was that the input data for a given reducer would contain one unique key. So in the previous examples, 3 reducers jobs would be needed. Is my understand incorrect?
Then the author presents improved versions of the mapper and the reducer. Here is the reducer:
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard this item
pass
if __name__ == "__main__":
main()
The author adds the following warning:
Note: The following Map and Reduce scripts will only work “correctly” when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test command “cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py” will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.
I don't understand why the naive test command doesn't work with the new version. I thought the use of sort -k1,1
would produce the same input for both versions of the reducer. What am I missing?