17
votes

I am getting strange errors while running a wordcount-like mapreduce program. I have a hadoop cluster with 20 slaves, each having 4 GB RAM. I configured my map tasks to have a heap of 300MB and my reduce task slots get 1GB. I have 2 map slots and 1 reduce slot per node. Everything goes well until the first round of map tasks finishes. Then there progress remains at 100%. I suppose then the copy phase is taking place. Each map task generates something like:

Map output bytes    4,164,335,564
Map output materialized bytes   608,800,675

(I am using SnappyCodec for compression)

After stalling for about an hour the reduce tasks crach with the following exception:

    Error: java.lang.OutOfMemoryError: Java heap space at  
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1703) at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1563) at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1401) at
org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1333

I was googling and found this link but I don't really know what to make of it: hadoop common link

I don't understand why hadoop would experience any problems in copying and merging if it is able to perform a terasort benchmark. It cannot be that all map output should fit into the RAM of the reducer thread. So what is going on here?

In the link provided above they have a discussion about tuning the following parameters:

mapreduce.reduce.shuffle.input.buffer.percent = 0.7
mapreduce.reduce.shuffle.memory.limit.percent = 0.25
mapreduce.reduce.shuffle.parallelcopies = 5

They claim that the fact that the product of the parameters is >1 allows for heapsize errors. EDIT: Note that 5*1.25*0.7 is still <1 so focus om my second solution post!) Before restarting this intensive simulation I would be very happy to hear about someone's opinion concerning the problem I am facing since it is bothering for almost a week now. I also seem to not completely understand what is happening in this copy phase, I'd expect a merge sort on disk not to require much heap size?

Thanks a lot in advance for any helpful comments and answers!

4
in the newest version of hadoop the parameters have been renamed to mapred.reduce.parallel.copies and mapred.job.shuffle.input.buffer.percent, the parameter with the shuffle.memory.limit i cannot find.DDW
UPDATE: setting mapred.job.shuffle.input.buffer.percent = 20 actually solved the issue. But still the question is open, why does it resolve it? Is it simply a bug in hadoop?DDW
You mean set mapred.job.shuffle.input.buffer.percent = 0.2 right?grasshopper

4 Answers

17
votes

I think the clue is that the heapsize of my reduce task was required almost completely for the reduce phase. But the shuffle phase is competing for the same heapspace, the conflict which arose caused my jobs to crash. I think this explains why the job no longer crashes if I lower the shuffle.input.buffer.percent.

9
votes

The parameter you cite mapred.job.shuffle.input.buffer.percent is apparently a pre Hadoop 2 parameter. I could find that parameter in the mapred-default.xml per the 1.04 docs but it's name has changed to mapreduce.reduce.shuffle.input.buffer.percent per the 2.2.0 docs.

Per the docs this parameter's description is:

The percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle.

For a complete understanding of Sort and Shuffle see Chapter 6.4 of The Hadoop Definitive Guide. That book provides an alternate definition of the parameter mapred.job.shuffle.input.buffer.percent:

The proportion of total heap size to be allocated to the map outputs buffer during the copy phase of the shuffle.

Since you observed that decreasing the value of mapred.job.shuffle.input.buffer.percent from it's default of 0.7 to 0.2 solved your problem, it is pretty safe to say that you could have also solved your problem by increasing the value of the reducer's heap size.

3
votes

Even after changing the shuffle.input.buffer.percent to 0.2 it doesn't work for me and got the same error.

After doing hit and trial on single node cluster, I found that there needs to be enough space in / directory as the process uses that space in case of spill.

The spill directory also needs to be changed.

1
votes

Related bug - https://issues.apache.org/jira/browse/MAPREDUCE-6724

Can cause a NegativeArraySizeException if the calculated maxSingleShuffleLimit > MAX_INT