1
votes

How to use only one map for many input files? Because Hadoop creates one mapper for one file. I need only one mapper for all files.

I tried to use CombineFileInputFormat. It had one mapper, but map input contained only one file. I need that input map value to contain data from all files (Text format) like this :

Input map value :

data from file1.txt
data from file2.txt
data from file3.txt

public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text> {

public WholeFileInputFormat() {
    super();
    setMaxSplitSize(67108864);
}

@Override
protected boolean isSplitable(JobContext context, Path file) {
    return false;
}

@Override
public RecordReader<NullWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {

    if (!(split instanceof CombineFileSplit)) {
        throw new IllegalArgumentException("split must be a CombineFileSplit");
    }
    RecordReader<NullWritable, Text> r = new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class);
    return r;
    //return null;
}

}


public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {

private final Text mFileText;

public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,
                             Integer pathToProcess) throws IOException {

    mProcessed = false;
    mFileToRead = fileSplit.getPath(pathToProcess);
    mFileLength = fileSplit.getLength(pathToProcess);
    mConf = context.getConfiguration();

    assert 0 == fileSplit.getOffset(pathToProcess);
    FileSystem fs = FileSystem.get(mConf);
    assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;

    //    mFileName = new Text();
    mFileText = new Text();
}

@Override
public void close() throws IOException {
    mFileText.clear();
}


@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
    return NullWritable.get();
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    return mFileText;
}

@Override
public float getProgress() throws IOException, InterruptedException {
    return (mProcessed) ? (float) 1.0 : (float) 0.0;
}

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
    // no-op.
}


@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!mProcessed) {
        if (mFileLength > (long) Integer.MAX_VALUE) {
            throw new IOException("File is longer than Integer.MAX_VALUE.");
        }
        byte[] contents = new byte[(int) mFileLength];

        FileSystem fs = mFileToRead.getFileSystem(mConf);
        FSDataInputStream in = null;
        try {
            // Set the contents of this file.
            in = fs.open(mFileToRead);
            IOUtils.readFully(in, contents, 0, contents.length);
            mFileText.set(contents, 0, contents.length);

        } finally {
            IOUtils.closeStream(in);
        }
        mProcessed = true;
        return true;
    }
    return false;
}
}

Could you help me?

1
Couldnot understand what you are trying to achieve, as you need to write only one map for all your inputs. - Jijo
Jijo , I try to combine data input in 1000 files to one map input value (for one mapper). For example for 1000 files it will be 1000 mappers as i understand. It is very long. I need that text data from all files combine in one record and then used for map input value. - Den

1 Answers

0
votes

The number of mappers is not driven by the number of files, but by the number of blocks comprising those files; thus, Hadoop splits each file into blocks, and and a mapper is created for each one of them. Please, take a look to a link like this one in order to know more about how the number of mappers and reducers is chosen by Hadoop.

If you exactly want one mapper, it must be said that setting this parameter mapred.map.tasks will not work, since this is hint for Hadoop, not a mandatory paramter. You can try to increase the block size to a very high number...

Anyway, it has no sense to use a single mapper with Hadoop... you will be missing the distributed processing of the data, that is one of the advantages of such a system.