1
votes

I have a simple map reduce job where I am building reverse index.

My mapper works correctly (I checked that) and outputs the key pair of word and docID:TFIDF value:

Mapper (only the output shown):

context.write(new IntWritable(wordIndex), new Text(index + ":" + tfidf));

The only job of reducer is to combine these values. This is my implementation:

public static class IndexerReducer extends Reducer<Text, IntWritable, IntWritable, Text>
    {
        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
        {

            StringBuilder sb = new StringBuilder();

            for (Text value : values)
            {
                sb.append(value.toString() + " ");
            }

            context.write(key, new Text(sb.toString()));
        }
    }

However, it does not combine anything and the output looks basically the same as form the mapper. There are lines in the output with same key although the reducer was supposed to combine them - basically all the keys in the output file are supposed to be unique when using reducer, right?

This is a sample of how my reducer output looks like (note that this is simplified example):

1 15:2.1
1 13:4.3
2 9:9.3
2 43:7.9
etc

I expected this:

1 15:2.1 13:4.3
2 9:9.3 43:7.9

For sake of completeness, I am including the run method:

@Override
    public int run(String[] arguments) throws Exception {
        ArgumentParser parser = new ArgumentParser("TextPreprocessor");

        parser.addArgument("input", true, true, "specify input directory");
        parser.addArgument("output", true, true, "specify output directory");

        parser.parseAndCheck(arguments);

        Path inputPath = new Path(parser.getString("input"));
        Path outputDir = new Path(parser.getString("output"));

        // Create configuration.
        Configuration conf = getConf();

        // add distributed file with vocabulary
        DistributedCache
                .addCacheFile(new URI("/user/myslima3/vocab.txt"), conf);

        // Create job.
        Job job = new Job(conf, "WordCount");
        job.setJarByClass(IndexerMapper.class);

        // Setup MapReduce.
        job.setMapperClass(IndexerMapper.class);
        job.setReducerClass(IndexerReducer.class);

        // Sort the output words in reversed order.
        job.setSortComparatorClass(WordCountComparator.class);


        job.setNumReduceTasks(1);

        // Specify (key, value).
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        // Input.
        FileInputFormat.addInputPath(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);

        // Output.
        FileOutputFormat.setOutputPath(job, outputDir);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileSystem hdfs = FileSystem.get(conf);

        // Delete output directory (if exists).
        if (hdfs.exists(outputDir))
            hdfs.delete(outputDir, true);

        // Execute the job.
        return job.waitForCompletion(true) ? 0 : 1;
    }

I would be glad for any hint about what is going on. I am new to map reduce. Thanks for any debugging tips!

2

2 Answers

4
votes

Always use the @Override annotation.

You defined

public static class IndexerReducer extends Reducer<Text, IntWritable, IntWritable, Text>

Then your reduce method must look like that

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
0
votes

@context is NOT org.apache.hadoop.mapreduce.Reducer.Context type. Our Reducer has our own Inner Class type of Context. So do NOT USE "org.apache.hadoop.mapreduce.Reducer.Context", just use "Context" This will make sure @Override can be added to reduce function without error.