0
votes

While I was going through book Hadoop In Action there was an option which states that rather than adding the small files to distributed cache via program this can be done using the -files generic options.

When I tried this in the setup() of my code I get a FileNotFoundException at fs.open() and it shows me a path which am not sure with.

Question is : If I use -files generic options by default where in HDFS the file gets copied to ?

The code am trying to execute is below..

public class JoinMapSide2 extends Configured implements Tool{

/*  Program     : JoinMapSide2.java 
    Description : Passing the small file via GenericOptionsParser
                  hadoop jar JoinMapSide2.jar -files orders.txt .........
    Input       : /data/patent/orders.txt(local file system), /data/patent/customers.txt
    Output      : /MROut/JoinMapSide2
    Date        : 23/03/2015  
*/

protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>{

    // hash table to store the key+value from the distributed file or the background data
    private Hashtable <String, String> joinData = new Hashtable <String, String>();

    // setup function for filling up the joinData for each each map() call
    protected void setup(Context context) throws IOException, InterruptedException {

        String line;
        String[] tokens;

        FileSystem fs;
        FSDataInputStream fdis;
        LineReader joinReader;
        Configuration conf;

        Text buffer = new Text();

        // get configuration
        conf = context.getConfiguration();
        // get file system related to the configuration
        fs = FileSystem.get(conf);

        // get all the local cache files distributed as part of the job
        URI[] localFiles = context.getCacheFiles();

        System.out.println("Cache File Path:"+localFiles[0].toString());

        // check if there are any distributed files 
        // in our case we are sure we will always one so use that only 
        if (localFiles.length > 0){
            // since the file is now on HDFS FSDataInputStream to read through the file
            fdis = fs.open(new Path(localFiles[0].toString()));
            joinReader = new LineReader(fdis);

            // read local file until EOF
            try {
                while (joinReader.readLine(buffer) > 0) {
                    line = buffer.toString();
                    // apply the split pattern only once
                    tokens = line.split(",",2); 
                    // add key+value into the Hashtable
                    joinData.put(tokens[0], tokens[1]);
                }
            } finally {
                joinReader.close();
                fdis.close();
            }
        }
        else{
            System.err.println("No Cache Files are distributed");
        }
    }

    // map function
    protected void map(Text key,Text value, Context context) throws IOException, InterruptedException{

        NullWritable kNull = null;

        String joinValue = joinData.get(key.toString());

        if (joinValue != null){
            context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
        }                   
    }
}   

@Override
public int run(String[] args) throws Exception {

    if (args.length < 2){
        System.err.println("Usage JoinMapSide -files <smallFile> <inputFile> <outputFile>");
    }

    Path inFile  = new Path(args[0]); // input file(customers.txt)
    Path outFile = new Path(args[1]); // output file file

    Configuration conf = getConf();
    // delimiter for the input file
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

    Job job = Job.getInstance(conf, "Map Side Join2");

    // this is not used as the small file is distributed to all the nodes in the cluster using
    // generic options parser
    // job.addCacheFile(disFile.toUri());   

    FileInputFormat.addInputPath(job, inFile);
    FileOutputFormat.setOutputPath(job, outFile);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setJarByClass(JoinMapSide2.class);
    job.setMapperClass(MapClass.class);

    job.setNumReduceTasks(0);

    job.waitForCompletion(true);

    return 0;
}

public static void main(String args[]) throws Exception {
    int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);

    System.exit(ret);
}

This is the below exception I see in the trace

Error: java.io.FileNotFoundException: File does not exist: /tmp/hadoop-yarn/staging/shiva/.staging/job_1427126201553_0003/files/orders.txt#orders.txt
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:64)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)

I start the job like

hadoop jar JoinMapSide2.jar -files orders.txt /data/patent/join/customers.txt /MROut/JoinMapSide2

Any directions would be really helpful. Thanks

2

2 Answers

0
votes

First you need to move your orders.txt to hdfs and the you have to use -files

0
votes

Okay after some searching around I did find out there are 2 errors in my code above.

  1. I should not be using FileDataInputStream to read the distributed file as its local to the node running the mapper I should be using File.
  2. I should not be using URI.toString() instead I should be using the symbolic link added to my file which is just orders.txt

I have corrected code listed below hope it helps.

package org.samples.hina.training;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinMapSide2 extends Configured implements Tool{

/*  Program     : JoinMapSide2.java 
    Description : To learn Replicated Join using Distributed Cache via Generic Options -files
    Input       : file:/patent/join/orders1.txt(distributed to all nodes), /data/patent/customers.txt
    Output      : /MROut/JoinMapSide2
    Date        : 24/03/2015  
*/

protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>{

    // hash table to store the key+value from the distributed file or the background data
    private Hashtable <String, String> joinData = new Hashtable <String, String>();

    // setup function for filling up the joinData for each each map() call
    protected void setup(Context context) throws IOException, InterruptedException {

        String line;
        String[] tokens;

        // get all the cache files set in the configuration set in addCacheFile()
        URI[] localFiles = context.getCacheFiles();

        System.out.println("File1:"+localFiles[0].toString());

        // check if there are any distributed files 
        // in our case we are sure we will always one so use that only 
        if (localFiles.length > 0){
            // read from LOCAL copy
            File localFile1 = new File("./orders1.txt");

            // created reader to localFile1
            BufferedReader joinReader = new BufferedReader(new FileReader(localFile1));

            // read local file until EOF
            try {
                while ((line = joinReader.readLine()) != null){
                    // apply the split pattern only once
                    tokens = line.split(",",2); 
                    // add key+value into the Hashtable
                    joinData.put(tokens[0], tokens[1]);
                }
            } finally {
                joinReader.close();                 
            }

        } else{
            System.err.println("Local Cache File does not exist");
        }           
    }

    // map function
    protected void map(Text key,Text value, Context context) throws IOException, InterruptedException{

        NullWritable kNull = null;

        String joinValue = joinData.get(key.toString());

        if (joinValue != null){
            context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
        }                   
    }
}   

@Override
public int run(String[] args) throws Exception {

    if (args.length < 2){
        System.err.println("Usage JoinMapSide2 <inputFile> <outputFile>");
    }

    Path inFile   = new Path(args[0]); // input file(customers.txt)
    Path outFile  = new Path(args[1]); // output file file

    Configuration conf = getConf();
    // delimiter for the input file
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

    Job job = Job.getInstance(conf, "Map Side Join2");

    // add the files orders1.txt, orders2.txt to distributed cache
    // the files added by the Generic Options -files
    //job.addCacheFile(disFile1);

    FileInputFormat.addInputPath(job, inFile);
    FileOutputFormat.setOutputPath(job, outFile);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setJarByClass(JoinMapSide2.class);
    job.setMapperClass(MapClass.class);

    job.setNumReduceTasks(0);

    job.waitForCompletion(true);

    return 0;
}

public static void main(String args[]) throws Exception {
    int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);

    System.exit(ret);
}
}