I have a spark application running on a Yarn cluster that needs to read files from multiple buckets on an S3-compatible object store, each bucket having its own set of credentials.
According to hadoop documentation it should be possible to specify credentials for multiple buckets by setting configuration of the form spark.hadoop.fs.s3a.bucket.<bucket-name>.access.key=<access-key> in the active SparkSession but that has not worked for me in practice.
An example that according to the documentation, I believe should work:
import org.apache.spark.sql.{SaveMode, SparkSession}
case class BucketCredential(bucketName: String, accessKey: String, secretKey: String)
object TestMultiBucketReadWrite {
val credentials: Seq[BucketCredential] = Seq(
BucketCredential("bucket.1", "access.key.1", "secret.key.1"),
BucketCredential("bucket.2", "access.key.2", "secret.key.2")
)
def addCredentials(sparkBuilder: SparkSession.Builder, credentials: Seq[BucketCredential]): SparkSession.Builder = {
var sBuilder = sparkBuilder
for (credential <- credentials) {
sBuilder = sBuilder
.config(s"spark.hadoop.fs.s3a.bucket.${credential.bucketName}.access.key", credential.accessKey)
.config(s"spark.hadoop.fs.s3a.bucket.${credential.bucketName}.secret.key", credential.secretKey)
}
sBuilder
}
def main(args: Array[String]): Unit = {
val spark = addCredentials(SparkSession.builder(), credentials)
.appName("Test MultiBucket Credentials")
.getOrCreate()
import spark.implicits._
val dummyDF = Seq(1,2,3,4,5).toDS()
println("Testing multi write...")
credentials.foreach(credential => {
val bucket = credential.bucketName
dummyDF.write.mode(SaveMode.Overwrite).json(s"s3a://$bucket/test.json")
})
println("Testing multi read...")
credentials.foreach(credential => {
val bucket = credential.bucketName
val df = spark.read.json(s"s3a://$bucket/test.json").as[Long]
println(df.collect())
})
}
}
However, when submitted the job fails with the following error:
Testing multi write...
Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: null
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:93)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:545)
The job does succeed when I instead set the fs.s3a.access.key and fs.s3a.secret.key settings sequentially but that involves sequential reads/writes:
//...
println("Testing multi write...")
credentials.foreach(credential => {
val bucket = credential.bucketName
spark.conf.set("fs.s3a.access.key", credential.accessKey)
spark.conf.set("fs.s3a.secret.key", credential.secretKey)
dummyDF.write.mode(SaveMode.Overwrite).json(s"s3a://$bucket/test.json")
})
println("Testing multi read...")
credentials.foreach(credential => {
val bucket = credential.bucketName
spark.conf.set("fs.s3a.access.key", credential.accessKey)
spark.conf.set("fs.s3a.secret.key", credential.secretKey)
val df = spark.read.json(s"s3a://$bucket/test.json").as[Long]
println(df.collect())
})
//...