3
votes

I'm new to Hadoop, but this has been a learning project of mine for the last month.

In an attempt to keep this vague enough to be useful to others, let me throw out the basic goal first.... Assume:

  1. You have a large data set (obviously), of millions of basic ASCII text files.
    • Each file is a "record."
  2. The records are stored in a directory structure to identify customer & date
    • e.g. /user/hduser/data/customer1/YYYY-MM-DD, /user/hduser/data/customer2/YYYY-MM-DD
  3. You want to mimic the input structure for the output structure
    • e.g. /user/hduser/out/customer1/YYYY-MM-DD, /user/hduser/out/customer2/YYYY-MM-DD

I have looked at multiple threads:

And many more.. I've also been reading Tom White's Hadoop book. I've been eagerly trying to learn this. and I've frequently swapped between new API and old API, which is adding to the confusion of trying to learn this.

Many have pointed to MultipleOutputs (or the old api versions), but I seem to be unable to produce my desired output -- for instance, MultipleOutputs doesn't seem to accept a "/" to create a directory structure in write()

What steps need to be taken to create a file with the desired output structure? Currently I have a WholeFileInputFormat class, and related RecordReader that has a (NullWritable K, ByteWritable V) Pair (which can change if needed)

My map setup:

public class MapClass extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    private Text filenameKey;
    private MultipleOutputs<NullWritable, Text> mos;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();
        filenameKey = new Text(path.toString().substring(38)); // bad hackjob, until i figure out a better way.. removes hdfs://master:port/user/hduser/path/
        mos = new MultipleOutputs(context);
    }
}

There is also a cleanup() function that calls mos.close(), and the map() function is currently an unknown (what I need help with here)

Is this enough information to point a newbie in the direction of an answer? My next thoughts were creating a MultipleOutputs() object in every map() task, each with a new baseoutput String, but I'm unsure if it is efficient or even the right kind of action to take.

Advice would be appreciated, anything in the program can change at this point except for the Input -- I'm just trying to learn the framework -- but I would like to get as close to this result as possible (later on I will probably look at combining records to larger files, but they are already 20MB per record, and I want to make sure it works before I make it impossible to read in Notepad

Edit: Could this problem be solved by modifying/extending the TextOutputFormat.class? It seems it might have some of the methods that could work, but I'm unsure which methods I'd need to override...

1
I have not tried it but the the book "Hadoop the definitive guide" says the MultipleOutputs from the new API supports using file path separator (/). Are you saying that it does not work?Rags
@Rags It is likely an error in my execution of the MultipleOutputsPseudo

1 Answers

5
votes

If you turn off speculative execution, there is nothing stopping you manually creating the output folder structure / files in your mapper, and writing the records to them (ignoring the output context / collector)

For example, extending the snippet (setup method), you could do something like this (which is basically what multiple outputs is doing, but assuming that speculative execution is turned off to avoid file collisions where two map tasks are trying to write to the same output file):

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MultiOutputsMapper extends
        Mapper<LongWritable, Text, NullWritable, NullWritable> {
    protected String filenameKey;
    private RecordWriter<Text, Text> writer;
    private Text outputValue;
    private Text outputKey;

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // operate on the input record
        // ...

        // write to output file using writer rather than context
        writer.write(outputKey, outputValue);
    }

    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();

        // extract parent folder and filename
        filenameKey = path.getParent().getName() + "/" + path.getName();

        // base output folder
        final Path baseOutputPath = FileOutputFormat.getOutputPath(context);
        // output file name
        final Path outputFilePath = new Path(baseOutputPath, filenameKey);

        // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder
        TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() {
            @Override
            public Path getDefaultWorkFile(TaskAttemptContext context,
                    String extension) throws IOException {
                return outputFilePath;
            }
        };

        // create a record writer that will write to the desired output subfolder
        writer = tof.getRecordWriter(context);
    }

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        writer.close(context);
    }
}

Some points for consideration:

  • Are the customerx/yyyy-MM-dd paths files or folders of files (if folders of files, then you'll need to amend accordingly - this implementation assumes that there is one file per date and the file name is yyyy-MM-dd)
  • You may wish to look into LazyOutputFormat to prevent empty output map files being created