I am facing the problem while write MapReduce Program, my input file is being read twice by the program. already have gone through this why is my sequence file being read twice in my hadoop mapper class? answer, but unfortunately it did not help
My Mapper class is:
package com.siddu.mapreduce.csv;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class SidduCSVMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>
{
IntWritable one = new IntWritable(1);
@Override
public void map(LongWritable key, Text line,
OutputCollector<Text, IntWritable> output, Reporter report)
throws IOException
{
String lineCSV= line.toString();
String[] tokens = lineCSV.split(";");
output.collect(new Text(tokens[2]), one);
}
}
And My Reducer class is:
package com.siddu.mapreduce.csv;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class SidduCSVReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>
{
@Override
public void reduce(Text key, Iterator<IntWritable> inputFrmMapper,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException
{
System.out.println("In reducer the key is:"+key.toString());
int relationOccurance=0;
while(inputFrmMapper.hasNext())
{
IntWritable intWriteOb = inputFrmMapper.next();
int val = intWriteOb.get();
relationOccurance += val;
}
output.collect(key, new IntWritable(relationOccurance));
}
}
And finally My Driver class is:
package com.siddu.mapreduce.csv;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class SidduCSVMapReduceDriver
{
public static void main(String[] args)
{
JobClient client = new JobClient();
JobConf conf = new JobConf(com.siddu.mapreduce.csv.SidduCSVMapReduceDriver.class);
conf.setJobName("Siddu CSV Reader 1.0");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(com.siddu.mapreduce.csv.SidduCSVMapper.class);
conf.setReducerClass(com.siddu.mapreduce.csv.SidduCSVReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
client.setConf(conf);
try
{
JobClient.runJob(conf);
}
catch(Exception e)
{
e.printStackTrace();
}
}
}