1
votes

I am trying to run a MapReduce program in Java with 2 input files and also with 2 mappers.

Below is the code:

public class CounterMapper {
    public static class MyMap1 extends
            Mapper<LongWritable, Text, Text, LongWritable> {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] line = value.toString().split("\t");

            int age = Integer.parseInt(line[26]);

            context.write(new Text(line[7]), new LongWritable(age));
        }
    }

    public static class MyMap2 extends Mapper {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] line = value.toString().split("\t");
            int age = Integer.parseInt(line[26]);
            context.write(new Text(line[7]), new LongWritable(age));
        }
    }

    public static class MyRed extends
            Reducer<Text, LongWritable, Text, LongWritable> {
        String line = null;

        public void reduce(Text key, Iterable<LongWritable> values,
                Context context) throws IOException, InterruptedException {
            for (LongWritable value : values) {
                line = value.toString();
            }
            context.write(key, new LongWritable());
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "ProjectQuestion2");
        job.setJarByClass(CounterMapper.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        job.setNumReduceTasks(1);
        job.setMapperClass(MyMap1.class);
        job.setMapperClass(MyMap2.class);
        job.setReducerClass(MyRed.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        MultipleInputs.addInputPath(job, new Path(args[0]),
                TextInputFormat.class, MyMap1.class);
        MultipleInputs.addInputPath(job, new Path(args[1]),
                TextInputFormat.class, MyMap2.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

and after running the job i get the below error:

INFO mapreduce.Job: Task Id : attempt_1486434709675_0016_m_000000_2, Status : FAILED Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapreduce.lib.input.DelegatingMapper.run(DelegatingMapper.java:55) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

Any input is appreciated...Thanks.

1

1 Answers

1
votes

Its likely that its using the map() method in the base class Mapper instead of yours. Since this is an identity mapper (pass through) which would match up with the error you're seeing.

I would do a few things:

  1. In MyMap2 change Mapper to Mapper<LongWritable, Text, Text, LongWritable>.
  2. Make sure your map() methods are overriding the base Mapper class by add the @Override annotation to them.

You can also (improvements):

  1. Change Job job = new Job(conf, "ProjectQuestion2"); to Job job = Job.getInstance(conf, "ProjectQuestion2"); to remove the deprication warning.
  2. job.setMapOutputKeyClass() and job.setMapOutputValueClass() are set twice, you can remove one pair.