0
votes

I am learning Hadoop and tried executing my Mapreduce program. All Map tasks and Reducer tasks are completed fine, but Reducer Writing Mapper Output into Output file. It means Reduce function not at all invoked. My sample input is like below

1,a
1,b
1,c
2,s
2,d

and the expected output is like below

1 a,b,c
2 s,d 

Below is my Program.

package patentcitation;
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 
 
public class MyJob
{
        public static class Mymapper extends Mapper <Text, Text, Text, Text>
        {
                public void map (Text key, Text value, Context context) throws IOException, InterruptedException
                {
                        context.write(key, value);
                }
               
        }
        public static class Myreducer extends Reducer<Text,Text,Text,Text>
        {
               
                StringBuilder str = new StringBuilder();
               
               
               
                public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException
                {
                        for(Text x : value)
                        {
                                if(str.length() > 0)
                                {
                                        str.append(",");
                                }
                                str.append(x.toString());
                        }
                        context.write(key, new Text(str.toString()));
                }
               
        }
        public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException
        {
                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf, "PatentCitation");
                FileSystem fs = FileSystem.get(conf);
                job.setJarByClass(MyJob.class);
                FileInputFormat.addInputPath(job,new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                job.setMapperClass(Mymapper.class);
                job.setReducerClass(Myreducer.class);
                 job.setMapOutputKeyClass(Text.class);
              job.setMapOutputValueClass(Text.class);
                job.setInputFormatClass(KeyValueTextInputFormat.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
                if(fs.exists(new Path(args[1]))){
                   //If exist delete the output path
                   fs.delete(new Path(args[1]),true);
                }
                System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
}

Same question is asked here, I used the Iterable value in my reduce function as the answer suggested in that thread. But that doesnt fix the issue. I cannot comment there since my reputation score is low. So created the new Thread

Kindly help me where am doing wrong.

1
@BenWatson My output is below. 1,a 1,b 1,c 2,s 2,d (Same as input i.e Map output ) - user2731629

1 Answers

1
votes

You have made few mistakes in your program. Following are the mistakes:

  1. In the driver, following statement should be called before instantiating the Job class: conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
  2. In reducer, you should put the StringBuilder inside the reduce() function.

I have modified your code as below and I got the output:

E:\hdp\hadoop-2.7.1.2.3.0.0-2557\bin>hadoop fs -cat /out/part-r-00000
1       c,b,a
2       d,s

Modified code:

package patentcitation;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MyJob
{
    public static class Mymapper extends Mapper <Text, Text, Text, Text>
    {
        public void map(Text key, Text value, Context context) throws IOException, InterruptedException
        {
                context.write(key, value);
        }

    }
    public static class Myreducer extends Reducer<Text,Text,Text,Text>
    {

        public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException
        {
            StringBuilder str = new StringBuilder();

            for(Text x : value)
            {
                if(str.length() > 0)
                {
                    str.append(",");
                }
                str.append(x.toString());
            }
            context.write(key, new Text(str.toString()));
        }

    }
    public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException
    {
        Configuration conf = new Configuration();
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");
        Job job = Job.getInstance(conf, "PatentCitation");
        FileSystem fs = FileSystem.get(conf);
        job.setJarByClass(MyJob.class);
        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(Mymapper.class);
        job.setReducerClass(Myreducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        /*if(fs.exists(new Path(args[1]))){
            //If exist delete the output path
            fs.delete(new Path(args[1]),true);
        }*/
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}