3
votes

I have an XML file to be processed in a MapReduce job. While I can process it when it is uncompressed, it does not work when I compress it to bz2 format and store it in hdfs. Do I need to make some changes like specify what codec to use - I don't know where to do that. Any example would be great. I am using XMLInputFormat from mahaout to read the uncompressed XML file. I used bzip2 command to compress the file and hadoop dfs -copyFromLocal to copy the file to DFS. I am interested in reading and processing content inside <page></page> tag of the xml document. I am using hadoop-1.2.1 distribution. I can see there is FileOutputFormat.setOutputCompressorClass, but there isn't anything similar for FileInputFormat.

Here is the Main class of my job.

    public class Main extends Configured implements Tool {

        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new Main(), args);
            System.exit(res);
        }

        public int run(String[] args) throws Exception {

            if (args.length != 2) {
                System.err.println("Usage: hadoop jar XMLReaderMapRed "
                        + " [generic options] <in> <out>");
                System.out.println();
                ToolRunner.printGenericCommandUsage(System.err);
                return 1;
            }

            Job job = new Job(getConf(), "XMLTest");

            job.setInputFormatClass(MyXMLInputFormat.class);
            //Specify the start and end tag that has content
            getConf().set(MyXMLInputFormat.START_TAG_KEY, "<page>");
            getConf().set(MyXMLInputFormat.END_TAG_KEY, "</page>");

            job.setJarByClass(getClass());
            job.setMapperClass(XMLReaderMapper.class);
            job.setReducerClass(XmlReaderReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            boolean success = job.waitForCompletion(true);
            return success ? 0 : 1;
        }
    }

Edit: Reading from Hadoop - The Definitive Guide by Tom White, it is mentioned that "if your input files are compressed, they will be decompressed automatically as they are read by mapReduce, using the filename extension to determine which codec to use." So the file is decompressed automatically, but then why there is empty file created in output directory ?

Thanks!

2

2 Answers

2
votes

You should look at your core-site.xml configuration file and add a class for BZip2 codec if it's absent. Here is an example:

<property>
    <name>io.compression.codecs</name>
    <value>org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

Edit:

After adding codec please reproduce next steps to see that it works (your code may not):

hadoop fs -mkdir /tmp/wordcount/
echo "three one three three seven" >> /tmp/words
bzip2 -z /tmp/words
hadoop fs -put /tmp/words.bz2 /tmp/wordcount/
hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount /tmp/wordcount/ /tmp/wordcount_out/
hadoop fs -text /tmp/wordcount_out/part*
#you should see next three lines:
one     1
seven   1
three   3
#clean up
#this commands may be different in your case
hadoop fs -rmr /tmp/wordcount_out/
hadoop fs -rmr /tmp/wordcount/
0
votes

In your TextInputFormat implementation you're probably overriding createRecordReader and returning a custom implementation of RecordReader<KEYIN, VALUEIN> that doesn't take the codec into account. The default implementation returns a LineRecordReader that handles codecs correctly. You can find a reference implementation here, and the relevant changes required here.