2
votes

If you need to have the values sorted for a given key when passed to the reduce phase, such as for a moving average, or to mimick the LAG/LEAD Analytic functions in SQL, you need to implement a Secondary Sort in MapReduce.

After searching around on Google, the common suggestion is to:

A) Emit composite key, which includes the , in the map phase B) Create a "composite key comparator" class, the purpose of which is for the secondary sort, comparing the values to sort on after comparing the key, so that the Iterable passed to the reducer is sorted. C) Create a "natural key grouping comparator" class, the purpose of which is for the primary sort, comparing only the key to sort on, so that the Iterable passed to the reducer contains all of the values belonging to a given key. D) Create a "natural key partitioner class", the purpose of which I do not know and is the purpose of my question.

From here:

The natural key partitioner uses the natural key to partition the data to the reducer(s). Again, note that here, we only consider the “natural” key.

By natural key he of course means the actual key, not the composite key + value.

From here:

The default partition will calculate a hash over the entire key resulting in different hashes and the potential that the records are sent to separate reducers. To ensure that both records are sent to the same reducer let's implement a customer partitioner.

From here:

In a real Hadoop cluster, there are many reducers running in different nodes. If the data for the same zone and day don’t land in the same reducer after the map reduce shuffle, we are in trouble. The way to ensure that is taking charge of defining our own partitioning logic.

Every source I've presented plus all the others I've seen recommends the partioner class to be written according to the following pseudo code:

naturalKey = compositeKey.getNaturalKey()
return naturalKey.hashCode() % NUMBER_OF_REDUCERS

Now, I was under the impression that Hadoop guarentees that for a given key, ALL the values corresponding to that key will be directed to the same reducer.

Is the reason we create a custom Partitioner the same for which we created the "natural key grouping comparator" class, to prevent MapReduce from sending the composite key instead of the reducer key?


1

1 Answers

5
votes

The question is almost as good as an answer :), Everything you mentioned above is correct, I guess a different way of explaining the concept should help out.

So let me give it a shot.

Lets assume that our secondary sorting is on a composite key made out of Last Name and First Name.

composite key

With the composite key out of the way, now lets look at the secondary sorting mechanism

secondary sorting steps

The partitioner and the group comparator use only natural key, the partitioner uses it to channel all records with the same natural key to a single reducer. This partitioning happens in the Map Phase, data from various Map tasks are received by reducers where they are grouped and then sent to the reduce method. This grouping is where the group comparator comes into picture, if we would not have specified a custom group comparator then Hadoop would have used the default implementation which would have considered the entire composite key, which would have lead to incorrect results.

A visual Representation of the MR Steps