0
votes

I have looked at all the other threads on this topic and still have not found an answer...

Put simply, I want to access hadoop distributed cache from a Pig StoreFunc, and NOT from within a UDF directly.

Relevant PIG code lines:

DEFINE CustomStorage KeyValStorage('param1','param2','param3');
...
STORE BLAH INTO /path/ using CustomStorage();

Relevant Java Code:

public class KeyValStorage<M extends Message> extends BaseStoreFunc /* ElephantBird Storage which inherits from StoreFunc */ {

...
public KeyValStorage(String param1, String param2, String param3) {
    ...
        try {
            InputStream is = new FileInputStream(configName);
            try {
                prop.load(is);
            } catch (IOException e) {
                System.out.println("PROPERTY LOADING FAILED");
                e.printStackTrace();
            }
        } catch (FileNotFoundException e) {
            System.out.println("FILE NOT FOUND");
            e.printStackTrace();
        }
   }
...
}

configName is the name of the LOCAL file that I should be able to read from distributed cache, however, I am getting a FileNotFoundException. When I use the EXACT same code from within a PIG UDF directly, the file is found, so I know the file is being shipped via distributed cache. I set the appropriate param to make sure this happens:

<property><name>mapred.cache.files</name><value>/path/to/file/file.properties#configName</value></property>

Any ideas how I can get around this?

Thanks!

1

1 Answers

1
votes

StroreFunc's constructor is called both at frontend and backend. When it is called from the frontend, (before the job is launched) then you'll get FileNotFoundException because at this point the files from the distributed cache are not yet copied to the nodes' local disk.
You may check whether you are at the backend (when the job is being executed) and load the file only in this case e.g:

DEFINE CustomStorage KeyValStorage('param1','param2','param3');
set mapreduce.job.cache.files hdfs://host/user/cache/file.txt#config
...
STORE BLAH INTO /path/ using CustomStorage();

public KeyValStorage(String param1, String param2, String param3) {
  ...
  try {
    if (!UDFContext.getUDFContext().isFrontend()) {
      InputStream is = new FileInputStream("./config");
      BufferedReader br = new BufferedReader(new InputStreamReader(is)); 
      ...
  ...
}