1
votes

I have been running my Spark job on a local cluster which has hdfs from where the input is read and the output is written too. Now I have set up an AWS EMR and an S3 bucket where I have my input and I want my output to be written to S3 too.

The error:

User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://something/input, expected: hdfs://ip-some-numbers.eu-west-1.compute.internal:8020

I tried searching for the same issue and there are several questions regarding this issue. Some suggested that it's only for the output, but even when I disable output I get the same error.

Another suggestion is that there is something wrong with FileSystem in my code. Here are all of the occurances of input/output in my program:

The first occurance is in my custom FileInputFormat, in getSplits(JobContext job) which I have not actually modified myself but I can:

FileSystem fs = path.getFileSystem(job.getConfiguration());

Similar case in my custom RecordReader, also have not modified myself:

final FileSystem fs = file.getFileSystem(job);

In nextKeyValue() of my custom RecordReader which I have written myself I use:

FileSystem fs = FileSystem.get(jc);

And finally when I want to detect the number of files in a folder I use:

val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))

I assume the issue is with my code, but how can I modify the FileSystem calls to support input/output from S3?

4

4 Answers

5
votes

This is what I have done to solve this when launching a spark-job on EMR :

 val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)

Make sure to replace s3_bucket by the name of your bucket

I hope it's going to be helpful for someone

2
votes

The hadoop filesystem apis do not provide support for S3 out of the box. There are two implementations of the hadoop filesystem apis for S3: S3A, and S3N. S3A seems to be the preferred implementation. To use it you have to do a few things:

  1. Add the aws-java-sdk-bundle.jar to your classpath.
  2. When you create the FileSystem include values for the following properties in the FileSystem's configuration:

    fs.s3a.access.key
    fs.s3a.secret.key
    
  3. When specify paths on S3 don't use s3:// use s3a:// instead.

Note: create a simple user and try things out with basic authentication first. It is possible to get it to work with AWS's more advanced temporary credential mechanisms, but it's a bit involved and I had to make some changes to the FileSystem code in order to get it to work when I tried.

Source of info is here

1
votes

Try setting the default URI for the FileSystem:

FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s"s3a://$s3bucket"))

After specifying the key and secret using

fs.s3a.access.key
fs.s3a.secret.key

And getting file system as noted:

val hdfs = FileSystem.get(new java.net.URI(s"s3a://${s3_bucket}"), sparkSession.sparkContext.hadoopConfiguration)

I would still get the error

java.lang.IllegalArgumentException: Wrong FS: s3a:// ... , expected: file:///

To check the default filesystem, you can look at the above created hdfs FileSystem: hadoopfs.getUri which for me still returned file:///

In order to get this to work correctly, prior to running FileSystem.get, set the default URI of the filesystem.

val s3URI = s"s3a://$s3bucket"
FileSystem.setDefaultUri(spark.sparkContext.hadoopConfiguration, new URI(s3URI))

val hdfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
1
votes

EMR is configured to avoid the use of keys in the code or in your job configuration. The problem there is how the FileSystem is created in your example.

The default FileSystem that Hadoop create is the one for the hdfs schema.

So next code will not work if that path schema is s3://.

val fs = FileSystem.get(sc.hadoopConfiguration)
val status = fs.listStatus(new Path(path))

To create the right FileSystem, you need to use the path with the schema that you will use. For example, something like this:

val conf = sc.hadoopConfiguration
val pObj = new Path(path)
val status = pObj.getFileSystem(conf).listStatus(pObj)

From the Hadoop code:

Implementation in the FileSystem.get

   public static FileSystem get(Configuration conf) throws IOException {
      return get(getDefaultUri(conf), conf);
   }

Implementation using Path.getFileSystem:

   public FileSystem getFileSystem(Configuration conf) throws IOException {
      return FileSystem.get(this.toUri(), conf);
   }