0
votes

I have a mapper.py and reducer.py to process an input file which is just a regular linux file with the following format:

ID \t time \t duration \t Description \t status

Basically I want to group my ID in reducer, so I construct mapper as below:

#!/usr/bin/env python

import sys
import re

for line in sys.stdin:
    #remove leading and trailing whitespace
    line = line.strip()
    #split the line into portions
    portions = re.split(r'\t+',line)
    #take the first column (which is block number) to emit as key
    block = portions[0]
    print '%s\t%s\t%s\t%s\t%s' % (block,portions[1],portions[2],portions[3],portions[4])

Then in reducer, I will do some data processing as below:

#!/usr/bin/env python

from operator import itemgetter
import sys

bitmapStr=""
current_block=None
block=start=duration=precision=status=""
round=0 #interval is every 11 mins or 660 seconds

for line in sys.stdin:
    line=line.strip()
    block,start,duration,precision,status=line.split('\t')

    if current_block == block:
            duration = int(duration)
            while round < duration:
                    if(status.islower()):
                            bitmapStr=bitmapStr+"1"
                    else:
                            bitmapStr=bitmapStr+"0"
                    round = round + 660

            #amount of time exceed this block record
            round = round - duration

    else:
            if current_block:
                    print '%s\t%s' % (current_block,bitmapStr)
            round=0
            bitmapStr=""
            current_block=block
            duration = int(duration)
            while round < duration:
                    if(status.islower()):
                            bitmapStr=bitmapStr+"1"
                    else:
                            bitmapStr=bitmapStr+"0"
                    round = round + 660
            #amount of time exceed this block record
            round = round - duration

if current_block == block:
    print '%s\t%s'  % (current_block,bitmapStr)

I tested the mapper and reducer locally by doing:

cat small_data_sample | ./mapper.py | sort -k1,1 | ./reducer.py
#output is working as I expect

However, when I tried to run it through Hadoop, it produces the following error:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

The exact command to run hadoop is as follow:

bin/hadoop jar hadoop-streaming.jar \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-D mapred.text.key.partitioner.options='-k1,1'  \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options='-k1,1 -k2,2n' \
-D stream.num.map.output.key.fields=2 \
-input $hadoop_dir/data/sample \
-output $hadoop_dir/data/data_test1-output \
-mapper $dir/calBitmap_mapper.py \
-reducer $dir/calBitmap_reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

where $hadoop_dir is path to my hdfs location and $dir is where my mapper and reducer python scripts are.

Please let me know what I need to correct the error. Thank you in advance!

*Edit: I tried with a different input file (much smaller in size), and it seems to work fine. Therefore I don't know why with large input file, MapReduce breaks

1

1 Answers

0
votes

I found the solution to my error. It was in the mapper where I did not take extra care with different type of input. Some of my inputs have the first couple lines are comments, and therefore the portions array failed due to index out of bound. To fix this problem, I added a check:

if len(portions) == 5: #make sure it has 5 elements in there
    print '%s\t%s\t%s\t%s\t%s' % (block,portions[1],portions[2],portions[3],portions[4])