0
votes

I`m trying to run this simple map reduce code that counts the appearance of each word in a text file (this code was given in class):

   import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;

public class WordCount { 

public static class MapClass extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException,  InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString()); 
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());  
        context.write(word, one);
      }
    }
  }

  public static class ReduceClass extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,  InterruptedException {
      int sum = 0;
      for (IntWritable value : values) {
        sum += value.get();
      }
      context.write(key, new IntWritable(sum)); 
    }
  }

    public static class PartitionerClass extends Partitioner<Text, IntWritable> {
      @Override
      public int getPartition(Text key, IntWritable value, int numPartitions) {
        return getLanguage(key) % numPartitions;
      }

      private int getLanguage(Text key) {
         if (key.getLength() > 0) {
            int c = key.charAt(0);
            if (c >= Long.decode("0x05D0").longValue() && c <= Long.decode("0x05EA").longValue())
               return 1;
         }
         return 0;
      }
    }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    //conf.set("mapred.map.tasks","10");
    //conf.set("mapred.reduce.tasks","2");
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(MapClass.class);
    job.setPartitionerClass(PartitionerClass.class);
    job.setCombinerClass(ReduceClass.class);
    job.setReducerClass(ReduceClass.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}

I`m getting this weired null pointer exception, no idea where does it come from. I included hadoop-common, hadoop-mapreduce-client-core, hadoop-hdfs, on my pom.xml dependencies.

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:435)
    at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:277)
    at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:125)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:344)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1286)
    at WordCount.main(WordCount.java:74)
2

2 Answers

0
votes

Looking at your code, the only issue I can see is you aren't setting the number of reduce tasks, but your partitioner is expecting there to be two. You will be using the default number of 1.

You can try setting that in your driver using:

    job.setNumReduceTasks(2);
0
votes

Before running the code, we should first understand the functionality and some part of the data that we are trying to partition.

Partitioning the data requires the reducer to be set to a number of values which is being generated. (Set the reducer value to a higher number as we do not know the different number of values coming from file , you can then use lazy Output to not generate the files which is having 0 records).

By default, reducer is set to '1', so setting this value should help :-

job.setNumReduceTasks(integer_value);