I'm new to Hadoop and am attempting to use Hadoop streaming to parallelize a physics simulation that is compiled into a binary. The idea would be to run the binary in parallel using maps with one mapper per input file and then the reduce results of individual runs (which are written to files by the binary) using a python reducer script.
My question is how can I pass a list of input file names so that hadoop streaming will pass each filename as one key to the mapper script. Is this possible?
Bonus question: Ideally, I'd like to generate one mapper per file name, although I know specifying the number of mappers directly is not afforded to the user. Is it possible to coax Hadoop streaming into doing this?
Here is my current setup:
map.py:
#! /util/python/2.7.6/bin/python
import sys
sys.path.append('.')
import subprocess as sp
#mapper
partmc = './partmc'
for line in sys.stdin:
spec = line.strip().split('\t')[0] # eg, run_3.spec
args = [partmc, spec]
sp.Popen(args) #eg, ./partmc run_3.spec
relevant portion from hadoop-streaming job:
module load python/2.7.6-statsmodels-0.5.0
$HADOOP_HOME/bin/hadoop --config $HADOOP_CONF_DIR jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1-streaming.jar \
-file /user/thomasef/pop/mapreduce/1_urban_plume/map.py \
-file /user/thomasef/pop/mapreduce/1_urban_plume/reduce.py \
-file /user/thomasef/pop/mapreduce/1_urban_plume/run_1.spec \
-file /user/thomasef/pop/mapreduce/1_urban_plume/run_2.spec \
-file /user/thomasef/pop/mapreduce/1_urban_plume/run_3.spec \
-file /user/thomasef/pop/mapreduce/1_urban_plume/partmc \
-file /user/thomasef/pop/mapreduce/1_urban_plume/spec_list.txt \
-input /user/thomasef/pop/mapreduce/1_urban_plume/spec_list.txt \
-output /user/thomasef/pop/mapreduce/1_urban_plume/houtput/ \
-mapper /user/thomasef/pop/mapreduce/1_urban_plume/map.py \
-reducer /user/thomasef/pop/mapreduce/1_urban_plume/reduce.py \
-jobconf mapred.job.tracker=local \
-jobconf fs.defualt.name=local \
-verbose \
-numReduceTasks 1
where spec_list.txt is :
run_1.spec
run_2.spec
run_3.spec
I'm currently getting this error when I attempt this:
14/08/11 15:34:33 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: No input paths specified in job
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:152)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:201)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:138)
But the application works as expected if I call:
cat spec_list.txt | ./map.py | sort | ./reduce.py
Any and all advice on this will be much appreciated, as I have been working on this for weeks with no success and very slow progress.