I have a simple map reduce job where I am building reverse index.
My mapper works correctly (I checked that) and outputs the key pair of word and docID:TFIDF value:
Mapper (only the output shown):
context.write(new IntWritable(wordIndex), new Text(index + ":" + tfidf));
The only job of reducer is to combine these values. This is my implementation:
public static class IndexerReducer extends Reducer<Text, IntWritable, IntWritable, Text>
{
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
StringBuilder sb = new StringBuilder();
for (Text value : values)
{
sb.append(value.toString() + " ");
}
context.write(key, new Text(sb.toString()));
}
}
However, it does not combine anything and the output looks basically the same as form the mapper. There are lines in the output with same key although the reducer was supposed to combine them - basically all the keys in the output file are supposed to be unique when using reducer, right?
This is a sample of how my reducer output looks like (note that this is simplified example):
1 15:2.1
1 13:4.3
2 9:9.3
2 43:7.9
etc
I expected this:
1 15:2.1 13:4.3
2 9:9.3 43:7.9
For sake of completeness, I am including the run method:
@Override
public int run(String[] arguments) throws Exception {
ArgumentParser parser = new ArgumentParser("TextPreprocessor");
parser.addArgument("input", true, true, "specify input directory");
parser.addArgument("output", true, true, "specify output directory");
parser.parseAndCheck(arguments);
Path inputPath = new Path(parser.getString("input"));
Path outputDir = new Path(parser.getString("output"));
// Create configuration.
Configuration conf = getConf();
// add distributed file with vocabulary
DistributedCache
.addCacheFile(new URI("/user/myslima3/vocab.txt"), conf);
// Create job.
Job job = new Job(conf, "WordCount");
job.setJarByClass(IndexerMapper.class);
// Setup MapReduce.
job.setMapperClass(IndexerMapper.class);
job.setReducerClass(IndexerReducer.class);
// Sort the output words in reversed order.
job.setSortComparatorClass(WordCountComparator.class);
job.setNumReduceTasks(1);
// Specify (key, value).
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// Input.
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output.
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem hdfs = FileSystem.get(conf);
// Delete output directory (if exists).
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute the job.
return job.waitForCompletion(true) ? 0 : 1;
}
I would be glad for any hint about what is going on. I am new to map reduce. Thanks for any debugging tips!