0
votes

Program to find maximum from a billions of number present in CSV file.

        package org.devender;

        import java.io.IOException;

        import org.apache.hadoop.io.LongWritable;
        import org.apache.hadoop.io.Text;
        import org.apache.hadoop.mapreduce.Mapper;

        public class SortMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

            public void map(LongWritable ikey, Text ivalue, Context context)
                    throws IOException, InterruptedException {

                String line = ivalue.toString();

                String TextInt[]=line.split(",");

                int MAX =0;

                for (int i=0;i>TextInt.length;i++) {
                int n=Integer.parseInt(TextInt[i].toString());
                if (n>MAX) {
                    MAX = n;
                }


                }
                Text max = new Text("Maximum"); 
                LongWritable BIG = new LongWritable(MAX);

                context.write(BIG,max);
            }

        }


    Getting below error

Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at org.devender.SortMapper.map(SortMapper.java:31) at org.devender.SortMapper.map(SortMapper.java:1) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415)

Driver this is my driver program Driver

     package org.devender;        
        import org.apache.hadoop.conf.Configuration;        
        public class SortMapReduce {

            public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf, "JobName");
                job.setJarByClass(org.devender.SortMapReduce.class);
                job.setMapperClass(org.devender.SortMapper.class);

                job.setReducerClass(org.devender.SortReducer.class);

                // TODO: specify output types
                job.setOutputKeyClass(LongWritable.class);
                job.setOutputValueClass(Text.class);

                // TODO: specify input and output DIRECTORIES (not files)
                FileInputFormat.setInputPaths(job,new Path(args[0]));
                FileOutputFormat.setOutputPath(job,new Path(args[1]));

                if (!job.waitForCompletion(true))
                    return;
            }

        }


//Reducer - The output coming as 0,Maximum ...0,Maxium but i was expecting the maximum value from the file and the "Highest number" tag along with the value.
------------------------------------------------------------------------


    public void reduce(Iterable<LongWritable> _key, Text values, Context context)
                    throws IOException, InterruptedException {
                // process values
                LongWritable MAX = new LongWritable(0);

                for (LongWritable val : _key) {
                    if (val.compareTo(MAX)>0) {

                        MAX=val;
                    }
                }
                Text t=new Text("Highest Number ");
                context.write(MAX,t);

            }

        }

I am using LongWritable as key and same is used in the mapper argument but dont know why compiler is saying expected Text. I am trying to read a line from a file and splitting into the separate number and first converting into the int and comparing it with each number present in the line. and saving it into the output context ,but compiler is saying expected Text, i dont know why comiler is expecting Text, when i have specifically mentioned its a longWritable in the Mapper. Can someone help to resolve this compiler error.. Now output is coming as 0 Maximum ,0 Maximum .... so on ...

1

1 Answers

0
votes

What is your job configuration? Are you using job.setOutputKeyClass, job.setOutputValueClass, job.setMapOutputKeyClass, job.setMapOutputValueClass as part of your code?

Can you share the entire code?

Edit 1: Here is the code, you have many issues in the code. You can learn map reduce in detail here

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class TestingMapReduce extends Configured implements Tool {

    public static class SortMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
            String line = ivalue.toString();

            String TextInt[] = line.split(",");

            int MAX = 0;

            for (int i = 0; i < TextInt.length; i++) {
                int n = Integer.parseInt(TextInt[i].toString());
                if (n > MAX) {
                    MAX = n;
                }
            }
            Text max = new Text("Maximum");
            LongWritable BIG = new LongWritable(MAX);

            context.write(max, BIG);
        }

    }

    public static class SortReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        Long MAX = 0L;
        public void reduce(Text _key, Iterable<LongWritable> values, Context context)
                throws IOException, InterruptedException {
            // process values

            for (LongWritable val : values) {
                if (val.get() > MAX) {
                    MAX = val.get();
                }
            }

            context.write(_key, new LongWritable(MAX));

        }

    }

    public int run(String[] arg0) throws Exception {

        Job job = Job.getInstance(getConf());

        // job.setJar("nyse-0.0.1-SNAPSHOT.jar");
        job.setJarByClass(getClass());
        job.setMapperClass(SortMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setReducerClass(SortReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        job.setNumReduceTasks(1);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(arg0[0]));
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String args[]) throws Exception {
        System.exit(ToolRunner.run(new TestingMapReduce(), args));
    }

}