1
votes

I've written some code in Hadoop that should do the following tasks:

In the Mapper: Records are read one by one from input splits and some processing is performed on them. Then, in accordance with the result of the work done, Some records are pruned and save in a set. At the end of the mapper this set must be sent to reducer.

In the Reducer: All of received sets from all Mappers are processed and final result is generated.

My question is: how can I delay sending mentioned set to the Reducer until processing of the last record in each of mappers. By default, all code that are written in Mapper runs as the number of input records (correct if wrong); So sets are sent to reducer multiple time (as the number of input records). How can I recognize end of processing of the input splits in each mapper?

(Now I use an if-condition with a counter for counting the number of processed records, but I think there must be better ways. Also if I don't know total number of records in files, this method does not work)

This is flowchart of the job :

enter image description here

1
Have you looked at using the cleanup() method which is called once when all records have been processed by the map() method?Binary Nerd
This is exactly what I was looking for. I tested it and it works! I did not have any information on cleanup() method. Write it with details as the answer (if you want). @Binaryfidelroha

1 Answers

1
votes

If you look at the Mapper class (Javadoc) you can see it has four methods available:

  • cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
  • map(KEYIN key, VALUEIN value, org.apache.hadoop.mapreduce.Mapper.Context context)
  • run(org.apache.hadoop.mapreduce.Mapper.Context context)
  • setup(org.apache.hadoop.mapreduce.Mapper.Context context)

The default implementation of run() looks like:

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        cleanup(context);
    }
}

This illustrates the order/when each of the methods are called. Typically you'll override the map() method. Doing some work at the start/end of a mapper running can be achieved using setup() and cleanup().

The code shows the map() method will be called once for each key/value pair entering the mapper. setup() and cleanup() will each be called just once at the start and end of the key/values being processed.

In your case you can use cleanup() to output the set of values once, when all the key/values have been processed.