0
votes

My input is many text files. I want my map-reduce program to write all the files-names and the associated sentences with the file names in one output file, where I want to just emit the file-name(key) and the associated sentences(value) from the mapper. The reducer will collect the key and all the values and write the file-name and their associated sentences in the output.

Here is the code of my mapper and reducer:

public class WordCount {
    public static class Map extends MapReduceBase implements Mapper<LongWritable,
    Text, Text, Text> {
        public void map(LongWritable key, Text value, OutputCollector<Text,Text>
        output, Reporter reporter) throws IOException {
            String filename = new String();
            FileSplit filesplit = (FileSplit)reporter.getInputSplit();
            filename=filesplit.getPath().getName();
            output.collect(new Text(filename), value);
        }
    }
    public static class Reduce extends MapReduceBase implements Reducer<Text, Text,
    Text, Text> {
        public void reduce(Text key, Iterable<Text> values, OutputCollector<Text,
        Text> output, Reporter reporter) throws IOException {
            StringBuilder builder = new StringBuilder();
            for(Text value : values) {
                String str = value.toString();
                builder.append(str);
            }
            String valueToWrite=builder.toString();
            output.collect(key, new Text(valueToWrite));
        }
        @Override
        public void reduce(Text arg0, Iterator<Text> arg1,
        OutputCollector<Text, Text> arg2, Reporter arg3)
        throws IOException {
        }
    }
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");
        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);
        conf.setJarByClass(WordCount.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        conf.setNumReduceTasks(1);
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);
    }
}

The output is as follows:

14/03/21 00:38:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library   
for your platform... using builtin-java classes where applicable
14/03/21 00:38:27 WARN mapred.JobClient: Use GenericOptionsParser for parsing the 
arguments. Applications should implement Tool for the same.
14/03/21 00:38:27 WARN mapred.JobClient: No job jar file set.  User classes may not  
be found. See JobConf(Class) or JobConf#setJar(String).
14/03/21 00:38:27 WARN snappy.LoadSnappy: Snappy native library not loaded
14/03/21 00:38:27 INFO mapred.FileInputFormat: Total input paths to process : 2
14/03/21 00:38:27 INFO mapred.JobClient: Running job: job_local_0001
14/03/21 00:38:27 INFO util.ProcessTree: setsid exited with exit code 0
14/03/21 00:38:27 INFO mapred.Task:  Using ResourceCalculatorPlugin : 
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4911b910
14/03/21 00:38:27 INFO mapred.MapTask: numReduceTasks: 1
14/03/21 00:38:27 INFO mapred.MapTask: io.sort.mb = 100
14/03/21 00:38:27 INFO mapred.MapTask: data buffer = 79691776/99614720
14/03/21 00:38:27 INFO mapred.MapTask: record buffer = 262144/327680
14/03/21 00:38:27 INFO mapred.MapTask: Starting flush of map output
14/03/21 00:38:27 INFO mapred.MapTask: Finished spill 0
14/03/21 00:38:27 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And  
is in the process of commiting
14/03/21 00:38:28 INFO mapred.JobClient:  map 0% reduce 0%
14/03/21 00:38:30 INFO mapred.LocalJobRunner:  
file:/root/Desktop/wordcount/sample.txt:0+5371
14/03/21 00:38:30 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
14/03/21 00:38:30 INFO mapred.Task:  Using ResourceCalculatorPlugin :  
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1f8166e5
14/03/21 00:38:30 INFO mapred.MapTask: numReduceTasks: 1
14/03/21 00:38:30 INFO mapred.MapTask: io.sort.mb = 100
14/03/21 00:38:30 INFO mapred.MapTask: data buffer = 79691776/99614720
14/03/21 00:38:30 INFO mapred.MapTask: record buffer = 262144/327680
14/03/21 00:38:30 INFO mapred.MapTask: Starting flush of map output
14/03/21 00:38:30 INFO mapred.MapTask: Finished spill 0
14/03/21 00:38:30 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And      
is in the process of commiting
14/03/21 00:38:31 INFO mapred.JobClient:  map 100% reduce 0%
14/03/21 00:38:33 INFO mapred.LocalJobRunner:  
file:/root/Desktop/wordcount/sample.txt~:0+587
14/03/21 00:38:33 INFO mapred.Task: Task 'attempt_local_0001_m_000001_0' done.
14/03/21 00:38:33 INFO mapred.Task:  Using ResourceCalculatorPlugin : 
org.apache.hadoop.util.LinuxResourceCalculatorPlugin@3963b3e
14/03/21 00:38:33 INFO mapred.LocalJobRunner: 
14/03/21 00:38:33 INFO mapred.Merger: Merging 2 sorted segments
14/03/21 00:38:33 INFO mapred.Merger: Down to the last merge-pass, with 2 segments  
left of total size: 7549 bytes
14/03/21 00:38:33 INFO mapred.LocalJobRunner: 
14/03/21 00:38:33 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And  
is in the process of commiting
14/03/21 00:38:33 INFO mapred.LocalJobRunner: 
14/03/21 00:38:33 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to 
commit now
14/03/21 00:38:33 INFO mapred.FileOutputCommitter: Saved output of task  
'attempt_local_0001_r_000000_0' to file:/root/Desktop/wordcount/output
14/03/21 00:38:36 INFO mapred.LocalJobRunner: reduce > reduce
14/03/21 00:38:36 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
14/03/21 00:38:37 INFO mapred.JobClient:  map 100% reduce 100%
14/03/21 00:38:37 INFO mapred.JobClient: Job complete: job_local_0001
14/03/21 00:38:37 INFO mapred.JobClient: Counters: 21
14/03/21 00:38:37 INFO mapred.JobClient:   File Input Format Counters 
14/03/21 00:38:37 INFO mapred.JobClient:     Bytes Read=5958
14/03/21 00:38:37 INFO mapred.JobClient:   File Output Format Counters 
14/03/21 00:38:37 INFO mapred.JobClient:     Bytes Written=8
14/03/21 00:38:37 INFO mapred.JobClient:   FileSystemCounters
14/03/21 00:38:37 INFO mapred.JobClient:     FILE_BYTES_READ=26020
14/03/21 00:38:37 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=117337
14/03/21 00:38:37 INFO mapred.JobClient:   Map-Reduce Framework
14/03/21 00:38:37 INFO mapred.JobClient:     Map output materialized bytes=7557
14/03/21 00:38:37 INFO mapred.JobClient:     Map input records=122
14/03/21 00:38:37 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/03/21 00:38:37 INFO mapred.JobClient:     Spilled Records=244
14/03/21 00:38:37 INFO mapred.JobClient:     Map output bytes=7301
14/03/21 00:38:37 INFO mapred.JobClient:     Total committed heap usage  
(bytes)=954925056
14/03/21 00:38:37 INFO mapred.JobClient:     CPU time spent (ms)=0
14/03/21 00:38:37 INFO mapred.JobClient:     Map input bytes=5958
14/03/21 00:38:37 INFO mapred.JobClient:     SPLIT_RAW_BYTES=185
14/03/21 00:38:37 INFO mapred.JobClient:     Combine input records=0
14/03/21 00:38:37 INFO mapred.JobClient:     Reduce input records=0
14/03/21 00:38:37 INFO mapred.JobClient:     Reduce input groups=2
14/03/21 00:38:37 INFO mapred.JobClient:     Combine output records=0
14/03/21 00:38:37 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
14/03/21 00:38:37 INFO mapred.JobClient:     Reduce output records=0
14/03/21 00:38:37 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
14/03/21 00:38:37 INFO mapred.JobClient:     Map output records=122

When I run the above mapper and reducer with the same configuration of inputformat (keyvaluetextinputformat.class) it does not write anything in the output.

What should I change to achieve my goal?

1
What is the current output (content of output file)? - Little Bobby Tables
I guess nothing, as shown by job counters. Map output bytes=0, Map function is not emitting as output so there will be nothing for reducers to process. - rVr
So sir what should I modify their - user2758378

1 Answers

0
votes

KeyValueTextInputFormat is not correct input format for your case. If you want to use this input format, each line in your input should contain a key,value pair separated by user specified delimiter or tab by default.But in your case input is "Set of files" and you want output of job to be "filename,content of file".

One of ways to achieve this would be to use TextInputFormat as input format. I have tested below and it works.

Get file name and content of file in map function

   public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
    {
          String filename = new String();
          FileSplit filesplit = (FileSplit)context.getInputSplit();
          filename=filesplit.getPath().getName();

          context.write(new Text(filename), new Text(value));

    }

In reduce function we build string of all values which will be contents of file

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
    {
    StringBuilder builder= new StringBuilder();
        for (Text value : values) 
        {
            String str = value.toString();
            builder.append(str);            
        }
        String valueToWrite= builder.toString();
     context.write(key, new Text(valueToWrite));   
    }    
}

Finally in job driver class set inputformat to our custom format and no of reducers to 1

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(myMapper.class); 
        job.setReducerClass(myReducer.class);
        job.setNumReduceTasks(1);