2
votes

I'm basically trying to run my first Hadoop MapReduce routine, and I have to use Hadoop and MapReduce, as I am doing this for a class project. I want to use Python for the mapper and reducer as I am most comfortable with this language and it is most familiar to my peers. I felt like the easiest way for me to set this up was through a Google DataProc instance, so I have that running as well. I'll describe what I have done and what resources I have used, but I am relatively new to this and I might be missing something.

Dataproc Configuration

Dataproc 1

Dataproc 2

Dataproc 3

And, then, I'm able to SSH into my primary node. I have the mapper.py and reducer.py files stored in a Google Cloud Storage bucket.

Mapper and reducer code is from this Micheal Noll blog post, modified to work with Python 3.

mapper.py:

#!/usr/bin/env python
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        #print ('%s\t%s' % (word, 1))
        print(f"{word}\t{1}")

reducer.py

#!/usr/bin/env python
"""reducer.py"""

from operator import itemgetter
import sys

print_out = lambda x, y: print(f'{x}\t{y}')

current_word = None
current_count = 0
word = None

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    #print("still working")

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            #print '%s\t%s' % (current_word, current_count)
            print_out(current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    #print '%s\t%s' % (current_word, current_count)
    print_out(current_word, current_count)

Finally, I ssh into my master node and then check my python version:

hduser@data-604-m:~$ python
Python 3.7.3 (default, Mar 27 2019, 22:11:17) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>

And I run the following (adapted from here):

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input gs://data-604-hadoop/books/pg20417.txt \
    -output gs://data-604-hadoop/output

Which results in the following:

hduser@data-604-m:~$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar     -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py     -map
per mapper.py     -reducer reducer.py     -input gs://data-604-hadoop/books/pg20417.txt     -output gs://data-604-hadoop/output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.9.2.jar] /tmp/streamjob4601880105330541890.jar tmpDir=null
19/11/12 02:10:46 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:47 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:49 INFO mapred.FileInputFormat: Total input files to process : 1
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: number of splits:15
19/11/12 02:10:49 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher
.enabled
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573523684358_0002
19/11/12 02:10:50 INFO impl.YarnClientImpl: Submitted application application_1573523684358_0002
19/11/12 02:10:50 INFO mapreduce.Job: The url to track the job: http://data-604-m:8088/proxy/application_1573523684358_0002/
19/11/12 02:10:50 INFO mapreduce.Job: Running job: job_1573523684358_0002
19/11/12 02:10:58 INFO mapreduce.Job: Job job_1573523684358_0002 running in uber mode : false
19/11/12 02:10:58 INFO mapreduce.Job:  map 0% reduce 0%
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:19 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:20 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)

19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:28 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:30 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:37 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:39 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:40 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:48 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:49 INFO mapreduce.Job:  map 80% reduce 0%
19/11/12 02:11:50 INFO mapreduce.Job:  map 100% reduce 100%
19/11/12 02:11:50 INFO mapreduce.Job: Job job_1573523684358_0002 failed with state FAILED due to: Task failed task_1573523684358_0002_m_000001
Job failed as tasks failed. failedMaps:1 failedReduces:0
19/11/12 02:11:50 INFO mapreduce.Job: Counters: 14
        Job Counters 
                Failed map tasks=19
                Killed map tasks=14
                Killed reduce tasks=5
                Launched map tasks=22
                Other local map tasks=14
                Rack-local map tasks=8
                Total time spent by all maps in occupied slots (ms)=885928
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=221482
                Total vcore-milliseconds taken by all map tasks=221482
                Total megabyte-milliseconds taken by all map tasks=453595136
        Map-Reduce Framework
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
19/11/12 02:11:50 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!

And, I honestly have no idea what to do at this point. I have put a lot of time into this and I feel like I'm at a brick wall as I'm not sure what is wrong.

I have also tried:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
    -mapper ./mapper.py \
    -reducer ./reducer.py \
    -input gs://data-604-hadoop/books/pg20417.txt \
    -output gs://data-604-hadoop/output

With similar results.

I appreciate any help.

Update: I have tried a few more things, without any success. I have tried moving my python scripts onto the Hadoop cluster. I then tested them with head -n100 mobydick.txt | ./mapper.py | sort | ./reducer.py and they work. In the comments below I mention that I looked into my shebang and made changes, but they were also unsuccessful.

1
Maybe this question can help you in some way, stackoverflow.com/questions/26208454/…King11
@King11 I will look into that further. I was hoping to do this with a Python implementation rather than Java, as we haven't learnt Java (although I do have a passing familiarity with it).sanadan
Someone on reddit suggested it might be my shebang. I ran: sys.executable and got /opt/conda/bin/python and I changed my shebang, but I get the same results. I also moved the files onto the google proc cluster and with both shebangs they run within the context of head -n100 mobydick.txt | ./mapper.py | sort | ./reducer.py, but the hadoop stream fails.sanadan
Are you sure you updated your shebang to be #!/opt/conda/bin/python and updated your python files in your GCS location before re-running? I actually repro'd your setup end-to-end and the fixed path seems to work for me; I'll post an answer for why this behaves the way it does.Dennis Huo
I checked my file and I was trying #!/opt/conda/bin python instead of #!/opt/conda/bin/python. A lot of this is very new to me, so I was fumbling around a lot. I was able to replicate your answer and my mapreduce now works! Thank you very much.sanadan

1 Answers

1
votes

There are a few different things going on here, but the main thing is it boils down to not necessarily being able to assume that the system environment of each mapper/reducer task (running as YARN containers) is going to necessarily have the same system environment as your logged-in shell. Many elements are going to intentionally be different in most circumstances (such as Java classpaths, etc). Normally with Java-based MapReduce programs this works as intended, since you'll end up with similar environment variables and classpath between the driver code that runs under the hadoop jar command and the executor code that runs on worker nodes in YARN containers. Hadoop streaming is a bit of an oddball since it's not as much of a first-class citizen in normal Hadoop usage.

Anyways, the main thing you're hitting in this case is that your default Python while logged in to the cluster is the Conda distro and Python version 3.7, but the default Python version in your YARN environment that spawns the mapper/reducer tasks is actually Python 2.7. This is an unfortunate consequence of some legacy compatibility considerations in Dataproc. You can see this in action by hacking a mapper.py to act as a dump of the environment info you need, for example, try running the following commands while SSH'd into your Dataproc cluster:

echo foo > foo.txt
hdfs dfs -mkdir hdfs:///foo
hdfs dfs -put foo.txt hdfs:///foo/foo.txt
echo '#!/bin/bash' > info.sh
echo 'which python' >> info.sh
echo 'python --version 2>&1' >> info.sh
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
     -Dmapred.reduce.tasks=0 \
     -Dmapred.map.tasks=1 \
     -files info.sh \
     -input hdfs:///foo \
     -output hdfs:///info-output \
     -mapper ./info.sh

Your local environment would show a different version of Python vs what's printed out at hdfs:///info-output:

$ bash info.sh
/opt/conda/bin/python
Python 3.7.3
$ hdfs dfs -cat hdfs:///info-output/*
/usr/bin/python 
Python 2.7.15+  

What this means is you can either make your mapper/reducer Python 2.7 compatible, OR you can explicitly specify /opt/conda/bin/python in your shebang. In my setup that replicated your settings (Dataproc 1.4-ubuntu18 plus the jupyter.sh init action) the following worked for me:

#!/opt/conda/bin/python
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        #print ('%s\t%s' % (word, 1))
        print(f"{word}\t{1}")

And reducer:

#!/opt/conda/bin/python
"""reducer.py"""

from operator import itemgetter
import sys

print_out = lambda x, y: print(f'{x}\t{y}')

current_word = None
current_count = 0
word = None

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue
    #print("still working")

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            #print '%s\t%s' % (current_word, current_count)
            print_out(current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    #print '%s\t%s' % (current_word, current_count)
    print_out(current_word, current_count)

However, another thing to keep in mind is that the jupyter.sh initialization action is kind of obsolete, and instead you should be using the actual Dataproc Jupyter Component. In any case though you may want to run the info.sh steps I outlined above first in order to determine the relevant Python environment to use in your mapper.py and reducer.py.

For example, a vanilla Dataproc 1.4-debian9 without the jupyter.sh init action will actually have the logged-in default Python be /opt/conda/default/bin/python instead of /opt/conda/bin/python. And a Dataproc 1.3-debian9 image will have /usr/bin/python be the default with Python 2.7.