I'm trying to use hadoop distributed cache in order to keep two input source with a map.
So, I make a prototype that joins two input file for use of distributed cache, and this problem work successfully.
However, distributed cache api does not work if I write the program that embodies multiple mapreduce jobs, and in the program, the output of prior job is used as one of two input file in the next job. However, distributed cached file does not emit anything.
Here is my job driver.
public int run(String[] args) throws Exception {
Path InputPath = new Path(args[0]);
Path Inter = new Path("Inters") ;//new Path(args[1]);
Path OutputPath = new Path(args[1]);
JobConf conf = new JobConf(getConf(), Temp.class);
FileSystem fs = FileSystem.get(getConf());
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(FirstMap.class);
//conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
//conf.setNumReduceTasks(0);
//20131220 - to deal with paths as variables
//fs.delete(Inter);
//DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
FileInputFormat.setInputPaths(conf, InputPath);
FileOutputFormat.setOutputPath(conf, Inter);
conf.set("threshold", args[2]);
JobClient.runJob(conf);
// start job 2
JobConf conf2 = new JobConf(getConf(), Temp.class);
conf2.setJobName("shit");
conf2.setMapOutputKeyClass(Text.class);
conf2.setMapOutputValueClass(IntWritable.class);
conf2.setOutputKeyClass(Text.class);
conf2.setOutputValueClass(IntWritable.class);
conf2.setMapperClass(Map.class);
//conf.setCombinerClass(Reduce.class);
conf2.setReducerClass(Reduce.class);
conf2.setNumReduceTasks(0);
conf2.setInputFormat(TextInputFormat.class);
conf2.setOutputFormat(TextOutputFormat.class);
//DistributedCache.addFileToClassPath(Inter, conf2);
//DistributedCache.addCacheFile(Inter.toUri(), conf2);
String InterToStroing = Inter.toString();
Path Inters = new Path(InterToStroing);
DistributedCache.addCacheFile(new Path(args[3]).toUri(), conf2);
FileInputFormat.setInputPaths(conf2, InputPath);
FileOutputFormat.setOutputPath(conf2, OutputPath);
conf2.set("threshold", "0");
JobClient.runJob(conf2);
return 0;
}
Also, here is the map function that deals with distributed cache.
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
static enum Counters {
INPUT_WORDS
}
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive = true;
private Set<String> patternsToSkip = new HashSet<String>();
private long numRecords = 0;
private String inputFile;
private Iterator<String> Iterator;
private Path[] localFiles;
public void configure (JobConf job) {
try {
localFiles = DistributedCache.getLocalCacheFiles(job);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
for (Path patternsFile : localFiles) {
parseSkipFile(patternsFile);
}
}
private void parseSkipFile(Path patternsFile) {
try {
BufferedReader fis = new BufferedReader(new FileReader(
patternsFile.toString()));
String pattern = null;
while ((pattern = fis.readLine()) != null) {
//String [] StrArr = pattern.split(" ");
System.err.println("Pattern : " + pattern );
patternsToSkip.add(pattern);
}
} catch (IOException ioe) {
System.err
.println("Caught exception while parsing the cached file '"
+ patternsFile
+ "' : "
+ StringUtils.stringifyException(ioe));
}
}
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
//output.collect(value, one);
ArrayList<String> temp = new ArrayList<String>();
String line = value.toString();
Iterator = patternsToSkip.iterator();
while (Iterator.hasNext()) {
output.collect(new Text(Iterator.next()+"+"+value.toString()),one);
}
/*while (Iterator.hasNext()) {
output.collect(new Text(Iterator.next().toString()), one);
}*/
//output.collect(value, one);
}
}
Any one who has dealt with this problem?