I'm trying to transition my Flink job (v1.8 running on EMR) from using the BucketingSink to the newer StreamingFileSink.
I've got the new code running and almost everything looks good. Files are written to S3 and transitioned to complete. The only issue is that S3's ACL isn't set like it was with the old code.
I've got my core-site.xml
set like so
<configuration>
<property>
<name>fs.s3a.acl.default</name>
<value>BucketOwnerFullControl</value>
</property>
</configuration>
I'm also using s3a://
as the prefix to the path in the forRowFormat()
argument to StreamingFileSink builder.
Also, when switching to the StreamingFileSink I had to add a new dependency to my build.gradle
flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"
I'm not super clear on how I was writing to S3 using the s3a:// prefix without this jar when I was using the BucketingSink api. Somehow I'm writing now writing to S3 in a way that doesn't respect my core-site.xml settings.