0
votes

I'm trying to use hadoop distributed cache in order to keep two input source with a map.

So, I make a prototype that joins two input file for use of distributed cache, and this problem work successfully.

However, distributed cache api does not work if I write the program that embodies multiple mapreduce jobs, and in the program, the output of prior job is used as one of two input file in the next job. However, distributed cached file does not emit anything.

Here is my job driver.

public int run(String[] args) throws Exception {
    Path InputPath = new Path(args[0]);
    Path Inter = new Path("Inters") ;//new Path(args[1]);
    Path OutputPath = new Path(args[1]);        

  JobConf conf = new JobConf(getConf(), Temp.class);
    FileSystem fs = FileSystem.get(getConf());
    conf.setJobName("wordcount");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(FirstMap.class);
    //conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(IntWritable.class);
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    //conf.setNumReduceTasks(0);


    //20131220 - to deal with paths as variables



    //fs.delete(Inter);

    //DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
    FileInputFormat.setInputPaths(conf, InputPath);
    FileOutputFormat.setOutputPath(conf, Inter);
    conf.set("threshold", args[2]);
    JobClient.runJob(conf);


    // start job 2

    JobConf conf2 = new JobConf(getConf(), Temp.class);
    conf2.setJobName("shit");

    conf2.setMapOutputKeyClass(Text.class);
    conf2.setMapOutputValueClass(IntWritable.class);

    conf2.setOutputKeyClass(Text.class);
    conf2.setOutputValueClass(IntWritable.class);

    conf2.setMapperClass(Map.class);
    //conf.setCombinerClass(Reduce.class);
    conf2.setReducerClass(Reduce.class);
    conf2.setNumReduceTasks(0);
    conf2.setInputFormat(TextInputFormat.class);
    conf2.setOutputFormat(TextOutputFormat.class);


    //DistributedCache.addFileToClassPath(Inter, conf2);
    //DistributedCache.addCacheFile(Inter.toUri(), conf2);
    String InterToStroing = Inter.toString();
    Path Inters = new Path(InterToStroing);

    DistributedCache.addCacheFile(new Path(args[3]).toUri(), conf2);
    FileInputFormat.setInputPaths(conf2, InputPath);
    FileOutputFormat.setOutputPath(conf2, OutputPath);

    conf2.set("threshold", "0");
    JobClient.runJob(conf2);

    return 0;
}

Also, here is the map function that deals with distributed cache.

public static class Map extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, IntWritable> {

    static enum Counters {
        INPUT_WORDS
    }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive = true;
    private Set<String> patternsToSkip = new HashSet<String>();

    private long numRecords = 0;
    private String inputFile;
    private Iterator<String> Iterator;

    private Path[] localFiles;
    public void configure (JobConf job) {
        try {
            localFiles = DistributedCache.getLocalCacheFiles(job);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        for (Path patternsFile : localFiles) {
            parseSkipFile(patternsFile);
        }
    }
    private void parseSkipFile(Path patternsFile) {
        try {
            BufferedReader fis = new BufferedReader(new FileReader(
                    patternsFile.toString()));
            String pattern = null;
            while ((pattern = fis.readLine()) != null) {
                //String [] StrArr = pattern.split(" ");
                System.err.println("Pattern : " + pattern );
                patternsToSkip.add(pattern);
            }
        } catch (IOException ioe) {
            System.err
                    .println("Caught exception while parsing the cached file '"
                            + patternsFile
                            + "' : "
                            + StringUtils.stringifyException(ioe));
        }
    }

    public void map(LongWritable key, Text value,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        //output.collect(value, one);


        ArrayList<String> temp = new ArrayList<String>();

        String line = value.toString();

        Iterator = patternsToSkip.iterator();


        while (Iterator.hasNext()) {
            output.collect(new Text(Iterator.next()+"+"+value.toString()),one);
        }
        /*while (Iterator.hasNext()) {
            output.collect(new Text(Iterator.next().toString()), one);
        }*/
        //output.collect(value, one);


    }
}

Any one who has dealt with this problem?

2

2 Answers

1
votes

Here is something I did to practice hadoop. It contains multiple path input as well as chaining job, doing reduce side join in university computer lab.

public class StockJoinJob extends Configured  {

public static class KeyPartitioner extends Partitioner<TextIntPair, TextLongIntPair> {
@Override
public int getPartition(TextIntPair key, TextLongIntPair value, int numPartitions) {
  return (key.getText().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}  

public static int runJob(String[] args) throws Exception {
      Configuration conf = new Configuration();
      Job job = new Job(conf);
  job.setJarByClass(StockJoinJob.class);

  Path nasdaqPath = new Path(args[0]);
  Path listPath = new Path(args[1]);
  Path outputPath = new Path(args[2]+"-first");

  MultipleInputs.addInputPath(job, listPath, TextInputFormat.class, CompanyMapper.class);
  MultipleInputs.addInputPath(job, nasdaqPath,
  StockInputFormat.class, StockMapper.class);
  FileOutputFormat.setOutputPath(job, outputPath);

  job.setPartitionerClass(KeyPartitioner.class);
  job.setGroupingComparatorClass(TextIntPair.FirstComparator.class);

  job.setMapOutputKeyClass(TextIntPair.class);
  job.setMapOutputValueClass(TextLongIntPair.class);
  job.setReducerClass(JoinReducer.class);

  job.setOutputKeyClass(TextIntPair.class);
  job.setOutputValueClass(TextLongPair.class);

  return job.waitForCompletion(true) ? 0 : 1;
    }

    public static int runJob2(String[] args) throws Exception {
  //need first comparator like previous job
  Configuration conf = new Configuration();
      Job job = new Job(conf);

  job.setJarByClass(StockJoinJob.class);
  job.setReducerClass(TotalReducer.class);
      job.setMapperClass(TotalMapper.class);
  Path firstPath = new Path(args[2]+"-first");
  Path outputPath = new Path(args[2]+"-second");

  //reducer output//
  job.setOutputKeyClass(TextIntPair.class);
  job.setOutputValueClass(TextLongPair.class);

  //mapper output//
  job.setMapOutputKeyClass(TextIntPair.class);
  job.setMapOutputValueClass(TextIntPair.class);      

  //etc            
  FileInputFormat.setInputPaths(job, firstPath);
  FileOutputFormat.setOutputPath(job, outputPath);
  outputPath.getFileSystem(conf).delete(outputPath, true);

  return job.waitForCompletion(true) ? 0 : 1;
    }



public static void main(String[] args) throws Exception {
int firstCode = runJob(args);
if(firstCode==0){
 int secondCode =runJob2(args);
  System.exit(secondCode);
 }


 }
 }
0
votes

I am not sure what the problem is exactly (maybe you should rephrase it), but I would suggest that you read the Yahoo tutorial on Chaining Jobs. I see two alternatives here:

  • If you do exactly the same map and don't care about the order of execution (in other words the two jobs can be executed in parallel), I would suggest creating one single job with two input paths. You can do that by using the commands:

    FileInputFormat.setInputPaths(conf, new Path(args[0])); FileInputFormat.addInputPath(conf, new Path(args[1]));

  • I think you need to add the two separate job drivers in the new "chain" driver and then add the dependencies (e.g. the second jobs depends on the first one, and should thus be executed when the first one finishes). Then the Distributed Cache could be declared in the driver of the second job. I hope this helps...