0
votes

While writing a map-reduce job in my local hadoop environment I ran into the problem that the Reducer did not receive the values I expected. I abstracted the problem down to the following:

I create an arbitrary input file with 10 lines to have the map method executed 10 times. In the mapper I create an invocation count and write this count as value to the output with 0 as key if the value is even and 1 as key if the value is odd, i.e. the following (key, value) pairs:

(1,1), (0,2), (1,3), (0,4), (1,5), etc.

I would expect to receive two calls to the Reducer with

  • 0 > [2,4,6,8,10]
  • 1 > [1,3,5,7,9]

but I get two calls with

  • 0 > [2,2,2,2,2]
  • 1 > [1,1,1,1,1]

instead. It seems I receive the first value that was wrote in the mapper with the multiplicities of the key (if I reverse the counter, I receive values 10 and 9 instead of 2 and 1). From my understanding this is not the expected behaviour (?), but I cannot figure out what I am doing wrong.

I use the following Mapper and reducer:

public class TestMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
    int count = 0;

    @Override
    protected void map(LongWritable keyUnused, Text valueUnused, Context context) throws IOException, InterruptedException {
        count += 1;
        context.write(new IntWritable(count % 2), new IntWritable(count));

        System.err.println((count % 2) + "|" + count);
    }
}

public class TestReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> valueItr, Context context) throws IOException, InterruptedException {
        List<IntWritable> values = Lists.newArrayList(valueItr);

        System.err.println(key + "|" + values);
    }
}

I run the hadoop job with a local test runner as described for example in the book "Hadoop: The Definitive Guide" (O'Reilly):

public class TestDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output>\n",
                    getClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }

        Job jobConf = Job.getInstance(getConf());
        jobConf.setJarByClass(getClass());
        jobConf.setJobName("TestJob");  

        jobConf.setMapperClass(TestMapper.class);
        jobConf.setReducerClass(TestReducer.class);

        FileInputFormat.addInputPath(jobConf, new Path(args[0]));
        FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));

        jobConf.setOutputKeyClass(IntWritable.class);
        jobConf.setOutputValueClass(IntWritable.class);

        return jobConf.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new TestDriver(), args));
}

packaged in a jar and run with 'hadoop jar test.jar infile.txt /tmp/testout'.

1

1 Answers

0
votes

Hadoop is reusing the value object while streaming the reducer values.

So in order to capture all of your different values, you need to copy:

@Override
protected void reduce(IntWritable key, Iterable<IntWritable> valueItr, Context context) throws  IOException, InterruptedException {        
    List<IntWritable> values = Lists.newArrayList();
    for(IntWritable writable : valueItr) {
        values.add(new IntWritable(writable.get());
    }

    System.err.println(key + "|" + values);
}