1
votes

I am trying to write a job which can analyse some information from youtube data set.I believe i have correctly set the output keys from the map in the driver class,but still i am getting the above error i am posting the code and the exception here,

The Mapper

public class YouTubeDataMapper extends Mapper<LongWritable,Text,Text,IntWritable>{

private static final IntWritable one = new IntWritable(1); 
private Text category = new Text(); 
public void mapper(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
    String str[] = value.toString().split("\t");
    category.set(str[3]);
    context.write(category, one);
}

}

The Reducer class

public class YouTubeDataReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
    int sum=0;
    for(IntWritable count:values){
        sum+=count.get();
    }
    context.write(key, new IntWritable(sum));
}

}

The Driver Class

public class YouTubeDataDriver {

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    @SuppressWarnings("deprecation")
    Job job = new Job(conf, "categories");
    job.setJarByClass(YouTubeDataDriver.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    // job.setNumReduceTasks(0);
    job.setOutputKeyClass(Text.class);// Here i have set the output keys
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(YouTubeDataMapper.class);
    job.setReducerClass(YouTubeDataReducer.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    Path out = new Path(args[1]);
    out.getFileSystem(conf).delete(out);
    job.waitForCompletion(true);

}

}

The exception i got

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1069) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

i have set the ouput keys in the driver class

    job.setOutputKeyClass(Text.class);// Here i have set the output keys
    job.setOutputValueClass(IntWritable.class);

But why i am still getting the error? PLease help, i am new to mapreduce

2

2 Answers

2
votes

Rename the mapper() method to map() (see official docs).

What's happening is that no data is actually being processed by the mapper. It doesn't enter the mapper() method (as it's looking for a map() method), and so leaves the map phase unchanged, meaning the map output key is still LongWritable.

As an aside,

String str[] = value.toString().split("\t");
category.set(str[3]);

is very dangerous. It's risky to assume that all of your input data will contain at least 3 \t characters. When processing large amounts of data there will almost always be some that's not in the format you expect, and you don't want your entire job to die when that happens. Consider doing something like:

String valueStr = value.toString();
if (valueStr != null) {
    String str[] = valueStr.split("\t");
    if (str[] != null && str.size > 3) {
        category.set(str[3]);
        context.write(category, one);
    }
}
0
votes

Below code(update LongWritable with Object) work for me -

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class YouTubeDataDriver {

    public static class YouTubeDataMapper
            extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class YouTubeDataReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "categories");
        job.setJarByClass(YouTubeDataDriver.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // job.setNumReduceTasks(0);
        job.setOutputKeyClass(Text.class);// Here i have set the output keys
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(YouTubeDataMapper.class);
        job.setReducerClass(YouTubeDataReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        Path out = new Path(args[1]);
        out.getFileSystem(conf).delete(out);
        job.waitForCompletion(true);

    }

}