1
votes

In my Hadoop reducers, I need to know how many successful map tasks were executed in the current job. I've come up with the following, which as far as I can tell does NOT work.

    Counter totalMapsCounter = 
        context.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS);
    Counter failedMapsCounter = 
        context.getCounter(JobInProgress.Counter.NUM_FAILED_MAPS);
    long nSuccessfulMaps = totalMapsCounter.getValue() - 
                           failedMapsCounter.getValue();

Alternatively, if there's a good way that I could retrieve (again, from within my reducers) the total number of input splits (not number of files, and not splits for one file, but total splits for the job), that would probably also work. (Assuming my job completes normally, that should be the same number, right?)

2
The more I think about this, the more I think my problem is actually due to the scope of counters. I can increment and read a counter just fine within a single mapper or reducer, but what I need/want is a way to read a globally aggregated counter value (computed in my mappers and used in my reducers).Mark

2 Answers

2
votes

Edit: Looks like it's not a good practice to retrieve the counters in the map and reduce tasks using Job or JobConf. Here is an alternate approach for passing the summary details from the mapper to the reducer. This approach requires some effort to code, but is doable. It would have been nice if the feature had been part of Hadoop and not required to hand code it. I have requested to put this feature into Hadoop and waiting for the response.


JobCounter.TOTAL_LAUNCHED_MAPS was retrieved using the below code in the Reducer class with the old MR API.

private String jobID;
private long launchedMaps;

public void configure(JobConf jobConf) {

    try {
        jobID = jobConf.get("mapred.job.id");

        JobClient jobClient = new JobClient(jobConf);

        RunningJob job = jobClient.getJob(JobID.forName(jobID));

        if (job == null) {
            System.out.println("No job with ID found " + jobID);
        } else {
            Counters counters = job.getCounters();
            launchedMaps = counters.getCounter(JobCounter.TOTAL_LAUNCHED_MAPS);
        }

    } catch (Exception e) {
        e.printStackTrace();
    }
}

With the new API, Reducer implementations can access the Configuration for the job via the JobContext#getConfiguration(). The above code can be implemented in Reducer#setup().

Reducer#configure() in the old MR API and Reducer#setup() in the new MR API, are invoked once for each reduce task before the Reducer.reduce() is invoked.

BTW, the counters can be got from other JVM also beside the one which kicked the job.

JobInProgress is defined as below, so it should not be used. This API is for limited projects only and the interface may change.

@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable

Not that, JobCounter.TOTAL_LAUNCHED_MAPS also includes map tasks launched due to speculative execution also

1
votes

Using new API I retrieved one userdefined counter(Enum in Mapper) and a Inbuilt counter. This is my reducer's code: This is in Setup method of reducer. Although there I have to use some classes of Old API(mapred package)

    JobContext jobContext= new JobContext(context.getConfiguration(), context.getJobID());
    Configuration c= jobContext.getConfiguration();

    jobID=c.get("mapred.job.id");
    //jobId= JobID.forName(jobID);

    JobClient jobClient = new JobClient(new JobConf(c));

    RunningJob job = jobClient.getJob((org.apache.hadoop.mapred.JobID) JobID.forName(jobID));

    Counters counters = job.getCounters();

    long customCounterCount= counters.getCounter(WordCountMapper.CustomCounters.COUNT);

    long totalMapInputRecords = counters.getCounter(Task.Counter.MAP_INPUT_RECORDS);

    System.out.println("customCounterCount==> " + customCounterCount);
    System.out.println("totalMapInputRecords==> " + totalMapInputRecords);