0
votes

I want understand how memory is used in the reduce phase of a MapReduce Job, so I can control the settings in the designated way.

If I understand correctly, the reducer first fetches its map output and leaves them in memory up to a certain threshold. The settings to control this are:

  • mapreduce.reduce.shuffle.merge.percent: The usage threshold at which an in-memory merge will be initiated, expressed as a percentage of the total memory allocated to storing in-memory map outputs, as defined by mapreduce.reduce.shuffle.input.buffer.percent.
  • mapreduce.reduce.input.buffer.percent: The percentage of memory- relative to the maximum heap size- to retain map outputs during the reduce. When the shuffle is concluded, any remaining map outputs in memory must consume less than this threshold before the reduce can begin.

Next, these spilled blocks are merged. It seems the following option controls how much memory is used for the shuffle:

But then, there is the setting:

But it is not clear to what value this percentage applies. Is there more information available regarding these values, i.e. what they control and how they differ?

Finally, after the merge completes, the reduce process is ran on the inputs. In the [Hadoop book][1], I found that the final merge-step directly feeds the reducers. But, the default value for mapreduce.reduce.input.buffer.percent=0 contradicts this, indicating that everything is spilled to disk BEFORE the reducers start. Is there any reference on which one of these explanations is correct?

[1]: Hadoop, The definitive guide, Fourth edition, p. 200

1

1 Answers

0
votes

Here is how mapreduce.reduce.shuffle.memory.limit.percent is used and its percentage implies a 0.70 percent of the whole reducer memory. That would be the maximum bytes upto which the data could be kept in memory for a single shuffle.

maxSingleShuffleLimit = (long)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
//MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION=mapreduce.reduce.shuffle.memory.limit.percent(0.25 f)
maxSize = (int)(conf.getInt("mapred.job.reduce.total.mem.bytes",(int)Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))* maxInMemCopyUse);//maxInMemCopyuse(mapred.job.shuffle.input.buffer.percent - 0.70f)

This property is used in the copy phase of the reducer. If the required map output is greater than the maxShufflelimit then the data is moved to disk,else kept in memory.

Property mapreduce.reduce.input.buffer.percent is completety different. Once all the data is copied and all the merge is done, just before the reducer starts it just checks whether the data stored in memory exceeds this limit.

You could refer this code(however it is for old mapred it should give an insight) on how maxSingleShuffleLimit and the other property are used.