2
votes

I am trying to run a hadoop job on a very large amount of data, using up to 32 reducers. But when I look in the output for each reducer I see that it may happen that more than one reducer gets a key (of course with different values). Can this behavior be avoided while using more reducers?

LE: I've tried and used the Text class instead, but the problem is that though it works fine, my jvm eventually crashes due to running low on heap space. What are the criteria hadoop uses for partitioning data into key pools apart from the compareTo ?

2

2 Answers

7
votes

You say you have a custom key (which implements WritableComparable), have you overridden the hashCode() method?

If you're using the HashPartitioner (which is the default), and haven't overridden the hashCode() method in your custom key, then two identical keys, from different mappers will most probably go to different reducers (the result of hashCode() is modulo'd with the number of reducers to determine the reducer to send the key/value pair to). This is because, by default the hashCode() method is native, and returns the address in memory of the object

a simple hashCode implementation for your key could be as simple as adding the hashcodes of the tuple fields together (assuming those fields have non-native hashCode implementations themselves):

public int hashCode() {
    return field1.hashCode() + field2.hashCode()
}
5
votes

I suspect that what you are seeing is speculative execution. Normally all values for a given key always go to exactly one reducer. From http://developer.yahoo.com/hadoop/tutorial/module4.html:

Speculative execution: One problem with the Hadoop system is that by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program. For example if one node has a slow disk controller, then it may be reading its input at only 10% the speed of all the other nodes. So when 99 map tasks are already complete, the system is still waiting for the final map task to check in, which takes much longer than all the other nodes.

By forcing tasks to run in isolation from one another, individual tasks do not know where their inputs come from. Tasks trust the Hadoop platform to just deliver the appropriate input. Therefore, the same input can be processed multiple times in parallel, to exploit differences in machine capabilities. As most of the tasks in a job are coming to a close, the Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do not have other work to perform. This process is known as speculative execution. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

Speculative execution is enabled by default. You can disable speculative execution for the mappers and reducers by setting the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution JobConf options to false, respectively.