0
votes

I have written a MapReduce job that works on some Protobuf files as input. Owing to the nature of the files (unsplittable), each file is processed by one mapper (implemented a custom FileInputFormat with isSplitable set to false). The application works well with input file-sizes less than ~680MB and produces the resulting files however, once the input file size crosses that limit, the application completes successfully but produces an empty file.

I'm wondering if I'm hitting some limit of file-size for a Mapper? If it matters, the files are stored on Google Storage (GFS) and not HDFS.

Thanks!

1

1 Answers

0
votes

Turns out I had hit a well-known Hadoop bug discussed here. The issue here was the BytesWritable class which was used to write the Protobuf files. In the custom RecordReader I previously did

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if(!processed){
        byte[] contents = new byte[(int) fileSplit.getLength()];
        Path file = fileSplit.getPath();
        log.debug("Path file:" + file);
        FileSystem fs = file.getFileSystem(conf);
        FSDataInputStream in = null;
        try{
            in = fs.open(file);
            IOUtils.readFully(in, contents, 0, contents.length);    
            value.set(contents, 0, contents.length);
        }catch(Exception e){
            log.error(e);
        }finally{
            IOUtils.closeQuietly(in);
        }
        processed = true;
        return true;
    }
    return false;
}

By default, the bug sets the maximum content size to INTEGER.MAX_SIZE/3 which is ~680MB. To get around this, I had to manually setCapacity(my_max_size) by doing

value.setCapacity(my_ideal_max_size)

before I did value.set().

Hope this helps somebody else!