While I was going through book Hadoop In Action there was an option which states that rather than adding the small files to distributed cache via program this can be done using the -files generic options.
When I tried this in the setup() of my code I get a FileNotFoundException at fs.open() and it shows me a path which am not sure with.
Question is : If I use -files generic options by default where in HDFS the file gets copied to ?
The code am trying to execute is below..
public class JoinMapSide2 extends Configured implements Tool{
/* Program : JoinMapSide2.java
Description : Passing the small file via GenericOptionsParser
hadoop jar JoinMapSide2.jar -files orders.txt .........
Input : /data/patent/orders.txt(local file system), /data/patent/customers.txt
Output : /MROut/JoinMapSide2
Date : 23/03/2015
*/
protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>{
// hash table to store the key+value from the distributed file or the background data
private Hashtable <String, String> joinData = new Hashtable <String, String>();
// setup function for filling up the joinData for each each map() call
protected void setup(Context context) throws IOException, InterruptedException {
String line;
String[] tokens;
FileSystem fs;
FSDataInputStream fdis;
LineReader joinReader;
Configuration conf;
Text buffer = new Text();
// get configuration
conf = context.getConfiguration();
// get file system related to the configuration
fs = FileSystem.get(conf);
// get all the local cache files distributed as part of the job
URI[] localFiles = context.getCacheFiles();
System.out.println("Cache File Path:"+localFiles[0].toString());
// check if there are any distributed files
// in our case we are sure we will always one so use that only
if (localFiles.length > 0){
// since the file is now on HDFS FSDataInputStream to read through the file
fdis = fs.open(new Path(localFiles[0].toString()));
joinReader = new LineReader(fdis);
// read local file until EOF
try {
while (joinReader.readLine(buffer) > 0) {
line = buffer.toString();
// apply the split pattern only once
tokens = line.split(",",2);
// add key+value into the Hashtable
joinData.put(tokens[0], tokens[1]);
}
} finally {
joinReader.close();
fdis.close();
}
}
else{
System.err.println("No Cache Files are distributed");
}
}
// map function
protected void map(Text key,Text value, Context context) throws IOException, InterruptedException{
NullWritable kNull = null;
String joinValue = joinData.get(key.toString());
if (joinValue != null){
context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
}
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2){
System.err.println("Usage JoinMapSide -files <smallFile> <inputFile> <outputFile>");
}
Path inFile = new Path(args[0]); // input file(customers.txt)
Path outFile = new Path(args[1]); // output file file
Configuration conf = getConf();
// delimiter for the input file
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
Job job = Job.getInstance(conf, "Map Side Join2");
// this is not used as the small file is distributed to all the nodes in the cluster using
// generic options parser
// job.addCacheFile(disFile.toUri());
FileInputFormat.addInputPath(job, inFile);
FileOutputFormat.setOutputPath(job, outFile);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setJarByClass(JoinMapSide2.class);
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return 0;
}
public static void main(String args[]) throws Exception {
int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);
System.exit(ret);
}
This is the below exception I see in the trace
Error: java.io.FileNotFoundException: File does not exist: /tmp/hadoop-yarn/staging/shiva/.staging/job_1427126201553_0003/files/orders.txt#orders.txt
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:64)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)
I start the job like
hadoop jar JoinMapSide2.jar -files orders.txt /data/patent/join/customers.txt /MROut/JoinMapSide2
Any directions would be really helpful. Thanks