I'm building a Spark application and trying to run it locally before launching it on EMR or in a container. I can get a DataFrame to work just fine when the parquet file itself is local, but it refuses to read the parquet file if it's in S3. I have attempted to set all the variables I can think of that are suggested when reading in from S3a, here's how I'm creating my Spark session:
package util
import org.apache.spark.sql.SparkSession
import scala.io.Source
object SparkSessionFactory {
def generateSession(sessionLocation: String): SparkSession = {
val session = {
sessionLocation match {
case "local" =>
SparkSession.builder().appName("LocalS3SparkProfiler").master("yarn").master("local[*]")
.config("spark.driver.host", "localhost")
.config("fs.s3a.enableServerSideEncryption", "true")
.config("fs.s3a.serverSideEncryptionAlgorithm", "aws:kms")
.getOrCreate()
}
}
setHadoopConfigs(session, sessionLocation)
session
}
private def setHadoopConfigs(session:SparkSession, sessionLocation:String) = {
session.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
session.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
sessionLocation match {
case "local"=> {
val userHome = System.getProperty("user.home")
val aWSCredentialsLines = Source.fromFile(s"$userHome/.aws/credentials").getLines.toList
val key = aWSCredentialsLines(1).substring(aWSCredentialsLines(1).lastIndexOf(" ")).trim
val secret = aWSCredentialsLines(2).substring(aWSCredentialsLines(2).lastIndexOf(" ")).trim
val s3Token = aWSCredentialsLines(3).substring(aWSCredentialsLines(3).lastIndexOf(" ")).trim
session.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", key)
session.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", secret)
session.sparkContext.hadoopConfiguration.set("fs.s3a.session.token", s3Token)
session.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
}
}
}
}
And then when I attempt to read the data frame I call
val spark = SparkSessionFactory.generateSession("local")
val df = spark.read.parquet("s3a://my-bucket/thepath/myparquetfile")
And the error thrown is as follows:
Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 366CFE11F21144F3; S3 Extended Request ID: eW4C6PQZ4uSJOPmYKoZ8qCwmK4PwL6eFPwef9e1KLA3kL2LsiCMctZ+ZLYVplZh927iNiSro7ko=), S3 Extended Request ID: eW4C6PQZ4uSJOPmYKoZ8qCwmK4PwL6eFPwef9e1KLA3kL2LsiCMctZ+ZLYVplZh927iNiSro7ko= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1632) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4330) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4277) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1265)
Everything I've read suggests the credentials I need are the ones I'm providing. I've checked the key
, secret
, and s3Token
values, and they look correct, as I use those credentials in another project that uses the normal AWS SDK with no problem.
Any idea on what the issue is?