0
votes

I have four classes namely MapperOne, ReducerOne, MapperTwo, ReducerTwo .I want a chain among these. MapperOne-->ReducerOne-->output file Generation which is input to MapperTwo-->MapperTwo-->ReducerTwo-->Final Output File.

MY DRIVER CLASS CODE:

public class StockDriver {


public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    System.out.println(" Driver invoked------");
    Configuration config = new Configuration();
    config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
    config.set("mapred.textoutputformat.separator", " --> ");

    String inputPath="In\\NYSE_daily_prices_Q_less.csv";

    String outpath = "C:\\Users\\Outputs\\run1";
    String outpath2 = "C:\\UsersOutputs\\run2";

    Job job1 = new Job(config,"Stock Analysis: Creating key values");
    job1.setInputFormatClass(TextInputFormat.class);
    job1.setOutputFormatClass(TextOutputFormat.class);

    job1.setMapOutputKeyClass(Text.class);
    job1.setMapOutputValueClass(StockDetailsTuple.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(Text.class);

    job1.setMapperClass(StockMapperOne.class);
    job1.setReducerClass(StockReducerOne.class);

    FileInputFormat.setInputPaths(job1, new Path(inputPath));
    FileOutputFormat.setOutputPath(job1, new Path(outpath));

    //THE SECOND MAP_REDUCE TO DO CALCULATIONS


    Job job2 = new Job(config,"Stock Analysis: Calculating Covariance");
    job2.setInputFormatClass(TextInputFormat.class);
    job2.setOutputFormatClass(TextOutputFormat.class);
    job2.setMapOutputKeyClass(LongWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(Text.class);
    job2.setMapperClass(StockMapperTwo.class);
    job2.setReducerClass(StockReducerTwo.class);


    String outpath3=outpath+"\\part-r-00000";
    System.out.println("OUT PATH 3: " +outpath3 );
    FileInputFormat.setInputPaths(job2, new Path(outpath3));
    FileOutputFormat.setOutputPath(job2, new Path(outpath2));


    if(job1.waitForCompletion(true)){
    System.out.println(job2.waitForCompletion(true));
    }
}

 }

MY MapperOne and ReducerOne is getting executed properly and the output file is stored in proper path. Now when the second job is executed, then ONLY the reducer is invoked. Below are my MapperTwo and ReducerTwo codes.

MAPPER TWO

public class StockMapperTwo extends Mapper {

public void map(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
    System.out.println("------ MAPPER 2 CALLED-----");

    for(Text val: values){
        System.out.println("KEY: "+ key.toString() + "   VALUE: "+ val.toString());
        //context.write(new Text("mapper2"), new Text("hi"));
        context.write(new LongWritable(2), new Text("hi"));
    }

}
 }

REDUCER TWO

 public class StockReducerTwo extends Reducer<LongWritable, Text, Text, Text>{

public void reduce(LongWritable key, Iterable<Text>values, Context context) throws IOException, InterruptedException{

        System.out.println(" REDUCER 2 INVOKED");

        context.write(new Text("hello"), new Text("hi"));


}
  }

My doubt to this config are

  1. Why the mapper is skipped even though its set in job2.setMapperClass(StockMapperTwo.class);

  2. If I dont set job2.setMapOutputKeyClass(LongWritable.class); job2.setMapOutputValueClass(Text.class); then even the reducer is not invoked. and this error is coming.

    java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:870) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:573) at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)

How this is happening ? Please help. I am not able to invoke my mapper and reducer properly. Please guide me.

1

1 Answers

1
votes

Sorry for posting this question. I didnt observed that my mapper is wrongly written.

insted of this

public void map(LongWritable key,Text values, Context context) throws IOException, InterruptedException{

I kept it like

public void map(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{

And it took me really looong time to observe the mistake. I m not sure why there was no proper error to show the mistake. Anyways its resolved now.