2
votes

I'm new to Hadoop and writing MapReduce jobs and I am encountering a problem where it appears reducers context.write method is changing correct values into incorrect ones.

What is MapReduce job supposed to do?

  • Count the total number of words (int wordCount)
  • Count the number of distinct words (int counter_dist)
  • Count the number of words starting with "z" or "Z" (int counter_startZ)
  • Count the number of words that appear less than 4 times (int counter_less4)

All of this must be done in a single MapReduce job.

Text file being analysed

Hello how zou zou zou zou how are you

Correct output:
wordCount = 9
counter_dist = 5
counter_startZ = 4
counter_less4 = 4

Mapper Class

public class WordCountMapper extends Mapper <Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            String hasKey = itr.nextToken();
            word.set(hasKey);
            context.write(word, one);
        }

    }
}

Reducer Class
In order to debug my code, I printed out a lot of statements to check my values at every point. Stdout code is available below.

public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> {

    int wordCount = 0; // Total number of words
    int counter_dist = 0; // Number of distinct words in the corpus
    int counter_startZ = 0; // Number of words that start with letter Z
    int counter_less4 = 0; // Number of words that appear less than 4 

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int repeatedWords = 0;
        System.out.println("###Reduce method starts");
        System.out.println("Values: wordCount:" + wordCount + " counter_dist:" + counter_dist + " counter_startZ:" + counter_startZ + " counter_less4:" + counter_less4 + " (start)");
        for (IntWritable val : values){
            System.out.println("Key: " + key.toString());
            repeatedWords++;
            wordCount += val.get();
            if(key.toString().startsWith("z") || key.toString().startsWith("Z")){
            counter_startZ++;
            }
            System.out.println("Values: wordCount:" + wordCount + " counter_dist:" + counter_dist + " counter_startZ:" + counter_startZ + " counter_less4:" + counter_less4 + " (end of loop)");
        }
        counter_dist++;

        if(repeatedWords < 4){
            counter_less4++;
        }

        System.out.println("Values: wordCount:" + wordCount + " counter_dist:" + counter_dist + " counter_startZ:" + counter_startZ + " counter_less4:" + counter_less4 + " repeatedWords:" + repeatedWords + " (end)");
        System.out.println("###Reduce method ends\n");
    }


    @Override
    public void cleanup(Context context) throws IOException, InterruptedException{
        System.out.println("###CLEANUP: wordCount: " + wordCount);
        System.out.println("###CLEANUP: counter_dist: " + counter_dist);
        System.out.println("###CLEANUP: counter_startZ: " + counter_startZ);
        System.out.println("###CLEANUP: counter_less4: " + counter_less4);

        context.write(new Text("Total words: "), new IntWritable(wordCount));
        context.write(new Text("Distinct words: "), new IntWritable(counter_dist));
        context.write(new Text("Starts with Z: "), new IntWritable(counter_startZ));
        context.write(new Text("Appears less than 4 times:"), new IntWritable(counter_less4));
    }


}

Stdout log, which I am using for debugging

###Reduce method starts
Values: wordCount:0 counter_dist:0 counter_startZ:0 counter_less4:0 (start)
Key: Hello
Values: wordCount:1 counter_dist:0 counter_startZ:0 counter_less4:0 (end of loop)
Values: wordCount:1 counter_dist:1 counter_startZ:0 counter_less4:1 repeatedWords:1 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:1 counter_dist:1 counter_startZ:0 counter_less4:1 (start)
Key: are
Values: wordCount:2 counter_dist:1 counter_startZ:0 counter_less4:1 (end of loop)
Values: wordCount:2 counter_dist:2 counter_startZ:0 counter_less4:2 repeatedWords:1 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:2 counter_dist:2 counter_startZ:0 counter_less4:2 (start)
Key: how
Values: wordCount:3 counter_dist:2 counter_startZ:0 counter_less4:2 (end of loop)
Key: how
Values: wordCount:4 counter_dist:2 counter_startZ:0 counter_less4:2 (end of loop)
Values: wordCount:4 counter_dist:3 counter_startZ:0 counter_less4:3 repeatedWords:2 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:4 counter_dist:3 counter_startZ:0 counter_less4:3 (start)
Key: you
Values: wordCount:5 counter_dist:3 counter_startZ:0 counter_less4:3 (end of loop)
Values: wordCount:5 counter_dist:4 counter_startZ:0 counter_less4:4 repeatedWords:1 (end)
###Reduce method ends

###Reduce method starts
Values: wordCount:5 counter_dist:4 counter_startZ:0 counter_less4:4 (start)
Key: zou
Values: wordCount:6 counter_dist:4 counter_startZ:1 counter_less4:4 (end of loop)
Key: zou
Values: wordCount:7 counter_dist:4 counter_startZ:2 counter_less4:4 (end of loop)
Key: zou
Values: wordCount:8 counter_dist:4 counter_startZ:3 counter_less4:4 (end of loop)
Key: zou
Values: wordCount:9 counter_dist:4 counter_startZ:4 counter_less4:4 (end of loop)
Values: wordCount:9 counter_dist:5 counter_startZ:4 counter_less4:4 repeatedWords:4 (end)
###Reduce method ends

###CLEANUP: wordCount: 9
###CLEANUP: counter_dist: 5
###CLEANUP: counter_startZ: 4
###CLEANUP: counter_less4: 4

From the log it appears like all the values are correct and that everything works fine. However, when I open the output directory in HDFS and read the "part-r-00000" file, the output from the context.write which is written there is completely different.

Total words: 22
Distinct words: 4
Starts with Z: 0
Appears less than 4 times: 4
1
I have added the complete code logic. Hope you understand it. Let me know. - Gyanendra Dwivedi
It seems like a strange thing ,had you try debug your code .to see the variables ! - HbnKing

1 Answers

1
votes

You must never rely on cleanup() method for crucial program logic. The cleanup() method is called every time a JVM is ripped off. So based on the number of JVM (which you can not predict) spawned and killed, your logic become volatile.

Move the initialization and writing to context both into reduce method.

i.e.

int wordCount = 0; // Total number of words
int counter_dist = 0; // Number of distinct words in the corpus
int counter_startZ = 0; // Number of words that start with letter Z
int counter_less4 = 0; // Number of words that appear less than 4 

and

   context.write(new Text("Total words: "), new IntWritable(wordCount));
    context.write(new Text("Distinct words: "), new IntWritable(counter_dist));
    context.write(new Text("Starts with Z: "), new IntWritable(counter_startZ));
    context.write(new Text("Appears less than 4 times:"), new IntWritable(counter_less4));

EDIT : Based on OP comments, it seems that whole logic is flawed.

Below is the code to accomplish the desired result. Please note that, I have not implemented setup() or cleanup(); because that is not at all needed.

Use counters to count what you are looking for. After MapReduce completes, just fetch the counters in the driver class.

e.g. Number of words and words starting with "z" or "Z" can be counted in the mapper

public class WordCountMapper extends Mapper <Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            String hasKey = itr.nextToken();
            word.set(hasKey);
            context.getCounter("my_counters", "TOTAL_WORDS").increment(1);
            if(hasKey.toUpperCase().startsWith("Z")){
            context.getCounter("my_counters", "Z_WORDS").increment(1);
            }
            context.write(word, one);
        }
    }
}

The Number of distinct words and words appearing less than 4 times can be counted in reducer counter.

public class WordCountReducer extends Reducer <Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int wordCount= 0;
        context.getCounter("my_counters", "DISTINCT_WORDS").increment(1);
        for (IntWritable val : values){
            wordCount += val.get();
        }
        if(wordCount < 4{
           context.getCounter("my_counters", "WORDS_LESS_THAN_4").increment(1);
        }
    }
}

In the Driver class fetch the counters. The below code goes after the line where you have submitted the job

CounterGroup group = job.getCounters().getGroup("my_counters");

for (Counter counter : group) {
   System.out.println(counter.getName() + "=" + counter.getValue());
}