I have a mapper.py and reducer.py to process an input file which is just a regular linux file with the following format:
ID \t time \t duration \t Description \t status
Basically I want to group my ID in reducer, so I construct mapper as below:
#!/usr/bin/env python
import sys
import re
for line in sys.stdin:
#remove leading and trailing whitespace
line = line.strip()
#split the line into portions
portions = re.split(r'\t+',line)
#take the first column (which is block number) to emit as key
block = portions[0]
print '%s\t%s\t%s\t%s\t%s' % (block,portions[1],portions[2],portions[3],portions[4])
Then in reducer, I will do some data processing as below:
#!/usr/bin/env python
from operator import itemgetter
import sys
bitmapStr=""
current_block=None
block=start=duration=precision=status=""
round=0 #interval is every 11 mins or 660 seconds
for line in sys.stdin:
line=line.strip()
block,start,duration,precision,status=line.split('\t')
if current_block == block:
duration = int(duration)
while round < duration:
if(status.islower()):
bitmapStr=bitmapStr+"1"
else:
bitmapStr=bitmapStr+"0"
round = round + 660
#amount of time exceed this block record
round = round - duration
else:
if current_block:
print '%s\t%s' % (current_block,bitmapStr)
round=0
bitmapStr=""
current_block=block
duration = int(duration)
while round < duration:
if(status.islower()):
bitmapStr=bitmapStr+"1"
else:
bitmapStr=bitmapStr+"0"
round = round + 660
#amount of time exceed this block record
round = round - duration
if current_block == block:
print '%s\t%s' % (current_block,bitmapStr)
I tested the mapper and reducer locally by doing:
cat small_data_sample | ./mapper.py | sort -k1,1 | ./reducer.py
#output is working as I expect
However, when I tried to run it through Hadoop, it produces the following error:
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
The exact command to run hadoop is as follow:
bin/hadoop jar hadoop-streaming.jar \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-D mapred.text.key.partitioner.options='-k1,1' \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options='-k1,1 -k2,2n' \
-D stream.num.map.output.key.fields=2 \
-input $hadoop_dir/data/sample \
-output $hadoop_dir/data/data_test1-output \
-mapper $dir/calBitmap_mapper.py \
-reducer $dir/calBitmap_reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
where $hadoop_dir is path to my hdfs location and $dir is where my mapper and reducer python scripts are.
Please let me know what I need to correct the error. Thank you in advance!
*Edit: I tried with a different input file (much smaller in size), and it seems to work fine. Therefore I don't know why with large input file, MapReduce breaks