1
votes

i am using Spark 1.1. I have a Spark job that seeks for a certain pattern of folders only under a bucket (i.e. folders that start with...), and should process only those. I achieve this by doing the following:

FileSystem fs = FileSystem.get(new Configuration(true));
FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
List<FileStatus> statusList = Arrays.asList(statusArr);

List<String> pathsStr = convertFileStatusToPath(statusList);

JavaRDD<String> paths = sc.parallelize(pathsStr);

However, when running this job on a Google Cloud Storage path: gs://rsync-1/2014_07_31* (using the latest google cloud storage connector 1.2.9), I get the following error:

4/10/13 10:28:38 INFO slf4j.Slf4jLogger: Slf4jLogger started    
14/10/13 10:28:38 INFO util.Utils: Successfully started service 'Driver' on port 60379.    
14/10/13 10:28:38 INFO worker.WorkerWatcher: Connecting to worker akka.tcp://[email protected]:45212/user/Worker    
Exception in thread "main" java.lang.reflect.InvocationTargetException    
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)    
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)    
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)    
    at java.lang.reflect.Method.invoke(Method.java:606)    
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)    
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)    
Caused by: java.lang.IllegalArgumentException: Wrong bucket: rsync-1, in path: gs://rsync-1/2014_07_31*, expected bucket: hadoop-config    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.checkPath(GoogleHadoopFileSystem.java:100)    
    at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:294)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.makeQualified(GoogleHadoopFileSystemBase.java:457)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem.getGcsPath(GoogleHadoopFileSystem.java:163)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1052)    
    at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.globStatus(GoogleHadoopFileSystemBase.java:1027)    
    at com.doit.customer.dataconverter.Phase0.main(Phase0.java:578)    
... 6 more

When I ran this job on a local folder, everything worked fine.

hadoop-config is a bucket I use for deploying the Spark cluster on Google Compute Engine (using bdutil 0.35.2 tool)

1

1 Answers

6
votes

Short Answer

Instead of using:

    FileSystem fs = FileSystem.get(new Configuration(true));
    FileStatus[] statusArr = fs.globStatus(new Path(inputPath));
    List<FileStatus> statusList = Arrays.asList(statusArr);

you need to do

    Path inputPathObj = new Path(inputPath);
    FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true));
    FileStatus[] statusArr = fs.globStatus(inputPathObj);
    List<FileStatus> statusList = Arrays.asList(statusArr);

because in Hadoop, FileSystem instances are shared based on the scheme and authority component of the URI (and potentially user-group information in more advanced settings), and such instances are not interchangeable between schemes and authorities.

Long Answer

This has to do with the distinction between the hostname and path components of a URI in [scheme]://[authority]/[path], which may be more obvious in the HDFS use case, but is also applicable to GCS. Basically, there are several get methods in org.apache.hadoop.fs.FileSystem, and the most applicable ones here are:

public static FileSystem get(Configuration conf)

and

public static FileSystem get(URI uri, Configuration conf)

The former actually just calls the latter with:

    return get(getDefaultUri(conf), conf);

where getDefaultUri(conf) is defined by fs.default.name or fs.defaultFS. The second consideration is that FileSystems with different hosthname or authority components are considered to be inherently different filesystems; in the HDFS case, this makes sense, as:

    FileSystem.get("hdfs://foo-cluster-namenode/", conf);
    FileSystem.get("hdfs://bar-cluster-namenode/", conf);

each point at potentially completely different filesystem instances, on separate clusters, allowing the same pathnames to be used on the two separate HDFS instances to refer to separate storage namespaces. Though less transparent in terms of "hostnames" of machines, the bucket in GCS indeed takes the role as the authority component of a GCE URI--in Hadoop, this means FileSystem.get literally returns the same cached Java FileSystem object when the bucket is the same, but different instances for different buckets. Just as you can't create an HDFS instance and point it at a different authority:

    // Can't mix authorities!
    FileSystem.get("hdfs://foo/", conf).listStatus(new Path("hdfs://bar/"));

when you called FileSystem.get(conf) you effectively got a cached instance pointed at gs://hadoop-config/, and then used that to try to list gs://rsync-1.

Instead, at the time you know the Path you want to operate on, that should be the time you fetch a FileSystem instance:

    FileSystem fs = FileSystem.get(myPath.toUri(), new Configuration(true));
    fs.globStatus(myPath);