0
votes

I am following the tutorial here for using distributed caching. I made slight changes to the code to make it compatabile with Hadoop2.2.

I find that when the loadStopWords method is called, IO exception is thrown:

I confirmed that the stop_words.txt is copied to HDFS. I have left out the mapper and reducer code to make it simple here.

Here is my code:

public static final String LOCAL_STOPWORD_LIST =
              "/Users/sridhar/Documents/hadoop/stop_words.txt";

    public static final String HDFS_STOPWORD_LIST = "/data/stop_words.txt";

    //copies local file to HDFS and adds to Job's cache file
    static  void cacheStopWordList(Configuration conf, Job job) throws IOException, URISyntaxException {
        FileSystem fs = FileSystem.get(conf);
        URI hdfsPath = new URI(HDFS_STOPWORD_LIST);

        System.out.println("coping files to HDFS");

        // upload the file to hdfs. Overwrite any existing copy.
        fs.copyFromLocalFile(false, true, new Path(LOCAL_STOPWORD_LIST),
            new Path(hdfsPath));

        System.out.println("done copying HDFS");
        job.addCacheFile(hdfsPath);
      }

    protected void setup(Context context) {
            try {
              String stopwordCacheName = new Path(HDFS_STOPWORD_LIST).toString();
              URI[] cacheFiles = context.getCacheFiles();

              System.out.println(Arrays.toString(cacheFiles));


              if (null != cacheFiles && cacheFiles.length > 0) {
                for (URI cacheURI : cacheFiles) {
                    System.out.println(cacheURI.toString());
                    System.out.println(stopwordCacheName);
                     System.out.println("-----------------");
                  if (cacheURI.toString().equals(stopwordCacheName)) {
                      System.out.println("****************************************");
                    loadStopWords(new Path(cacheURI)); // IT BREAKS HERE
                    System.out.println(stopWords);
                    break;
                  }
                }
              }
            } catch (IOException ioe) {
              System.err.println("IOException reading from distributed cache");
              System.err.println(ioe.toString());
            }
          }

        void loadStopWords(Path cachePath) throws IOException {
            // note use of regular java.io methods here - this is a local file now
            BufferedReader wordReader = new BufferedReader(
                new FileReader(cachePath.toString()));
            try {
              String line;
              this.stopWords = new HashSet<String>();
              while ((line = wordReader.readLine()) != null) {
                this.stopWords.add(line.toLowerCase());
              }
            } finally {
              wordReader.close();
            }
          }





public static void main(String[] args) throws IllegalArgumentException, IOException, InterruptedException, ClassNotFoundException, URISyntaxException {

Job job = new Job();
job.setJarByClass(LineIndexer.class);
job.setJobName("LineIndexer");
Configuration conf = job.getConfiguration();
cacheStopWordList(conf,job);
}
2

2 Answers

0
votes

I think you should try Path[] localPaths = context.getLocalCacheFiles(); instead of context.getCacheFiles(); Let me know if it worked

-1
votes

in the link you provided it is mentioned to use DistributedCache.addCacheFile(). Here is the para To use the distributed cache to disseminate files, create an instance of the DistributedCache class when setting up your job. Use the DistributedCache.addCacheFile() method to add names of files which should be sent to all nodes on the system.

instead of writing

job.addCacheFile(hdfsPath);

try writing

DistributedCache.addCacheFile(hdfsPath, job.getConfiguration());