21
votes

I have tried to use the MultipleOutputs class as per the example in page http://hadoop.apache.org/docs/mapreduce/r0.21.0/api/index.html?org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

Driver Code

    Configuration conf = new Configuration();
    Job job = new Job(conf, "Wordcount");
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
            Text.class, IntWritable.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);

Reducer Code

public class WordCountReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    private MultipleOutputs<Text, IntWritable> mos;
    public void setup(Context context){
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        //context.write(key, result);
        mos.write("text", key,result);
    }
    public void cleanup(Context context)  {
         try {
            mos.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
         }
}

The output of the reducer is found to rename to text-r-00000

But the issue here is that I am also getting an empty part-r-00000 file as well. Is this how MultipleOutputs is expected to behave, or is there some problem with my code? Please advice.

Another alternative I have tried out is to iterate through my output folder using the FileSystem class and manually rename all files beginning with part.

What is the best way?

FileSystem hdfs = FileSystem.get(configuration);
FileStatus fs[] = hdfs.listStatus(new Path(outputPath));
for (FileStatus aFile : fs) {
if (aFile.isDir()) {
hdfs.delete(aFile.getPath(), true);
// delete all directories and sub-directories (if any) in the output directory
} 
else {
if (aFile.getPath().getName().contains("_"))
hdfs.delete(aFile.getPath(), true);
// delete all log files and the _SUCCESS file in the output directory
else {
hdfs.rename(aFile.getPath(), new Path(myCustomName));
}
}
2

2 Answers

21
votes

Even if you are using MultipleOutputs, the default OutputFormat (I believe it is TextOutputFormat) is still being used, and so it will initialize and creating these part-r-xxxxx files that you are seeing.

The fact that they are empty is because you are not doing any context.write because you are using MultipleOutputs. But that doesn't prevent them from being created during initialization.

To get rid of them, you need to define your OutputFormat to say you are not expecting any output. You can do it this way:

job.setOutputFormat(NullOutputFormat.class);

With that property set, this should ensure that your part files are never initialized at all, but you still get your output in the MultipleOutputs.

You could also probably use LazyOutputFormat which would ensure that your output files are only created when/if there is some data, and not initialize empty files. You could do i this way:

import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);

Note that you are using in your Reducer the prototype MultipleOutputs.write(String namedOutput, K key, V value), which just uses a default output path that will be generated based on your namedOutput to something like: {namedOutput}-(m|r)-{part-number}. If you want to have more control over your output filenames, you should use the prototype MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath) which can allow you to get filenames generated at runtime based on your keys/values.

11
votes

This is all you need to do in the Driver class to change the basename of the output file: job.getConfiguration().set("mapreduce.output.basename", "text"); So this will result in your files being called "text-r-00000".