I want to use aws s3 as sink for a data stream in flink. I am using StreamingFileSink class to create a sink.
I don't need checkpointing for my job, but when I disable checkpointing, data is no longer written to S3.
case 1 : checkpointing enabled
When checkpointing is enabled, the data is successfully ingested to the mentioned s3 path.
case 2 : checkpointing disabled
When checkpointing is disabled, the data is not written to s3.
I tried executing the job multiple times, but every time I got the same result. I am facing this on local machine as well as on kubernetes cluster.
object FlinkTestJob {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// with checkpointing enabled
env.enableCheckpointing(100)
// Sinks
val streamStrings: Seq[String] =
Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10")
val testStream = env.fromCollection(streamStrings)
val rollingPolicy = new RollingPolicy[String, String] {
override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean =
partFileState.getSize > 1
override def shouldRollOnEvent(
partFileState: PartFileInfo[String],
element: String): Boolean = true
override def shouldRollOnProcessingTime(
partFileState: PartFileInfo[String],
currentTime: Long): Boolean = true
}
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("s3a://testbucket/sink"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build()
testStream.addSink(sink)
env.execute("test-job")
}
}
When I write to s3 using "writeAsText("s3a://testbucket/sink")" instead of StreamingFileSink, it works perfectly fine regardless of whether or not checkpointing is enabled.
Flink version : 1.8.0
I want to understand the relation between checkpointing and StreamingFileSink.
Thanks