I have a scala jar which I am calling from AWS Glue job. My jar writes to write a DataFrame to an S3 bucket in another AWS account which has KMS Encryption turned on. I am able to write to the bucket but I am not able to add the destination bucket owner permission to access the files. I can achieve this if simply use Glue Writer but with straight Spark, it just not work. I have read all the documentation and I am setting following bucket policies in hadoop configuration.
def writeDataFrameInTargetLocation( sparkContext:SparkContext = null, dataFrame: DataFrame, location: String, fileFormat: String,saveMode:String,encryptionKey:Option[String] = Option.empty,kms_region:Option[String]=Option("us-west-2")): Unit = {
if(encryptionKey.isDefined) { val region = if(kms_region.isDefined) kms_region.getOrElse("us-west-2") else "us-west-2"
sparkContext.hadoopConfiguration.set("fs.s3.enableServerSideEncryption", "false")
sparkContext.hadoopConfiguration.set("fs.s3.cse.enabled", "true")
sparkContext.hadoopConfiguration.set("fs.s3.cse.encryptionMaterialsProvider", "com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider")
sparkContext.hadoopConfiguration.set("fs.s3.cse.kms.keyId", encryptionKey.get) // KMS key to encrypt the data with
sparkContext.hadoopConfiguration.set("fs.s3.cse.kms.region", region) // the region for the KMS key
sparkContext.hadoopConfiguration.set("fs.s3.canned.acl", "BucketOwnerFullControl")
sparkContext.hadoopConfiguration.set("fs.s3.acl.default", "BucketOwnerFullControl")
sparkContext.hadoopConfiguration.set("fs.s3.acl", "bucket-owner-full-control")
sparkContext.hadoopConfiguration.set("fs.s3.acl", "BucketOwnerFullControl")
}
else {
sparkContext.hadoopConfiguration.set("fs.s3.canned.acl", "BucketOwnerFullControl")
sparkContext.hadoopConfiguration.set("fs.s3.acl.default", "BucketOwnerFullControl")
sparkContext.hadoopConfiguration.set("fs.s3.acl", "bucket-owner-full-control")
sparkContext.hadoopConfiguration.set("fs.s3.acl", "BucketOwnerFullControl")
}
val writeDF = dataFrame
.repartition(5)
.write
writeDF
.mode(saveMode)
.option(Header, true)
.format(fileFormat)
.save(location)
}