0
votes

I want to use aws s3 as sink for a data stream in flink. I am using StreamingFileSink class to create a sink.

I don't need checkpointing for my job, but when I disable checkpointing, data is no longer written to S3.

case 1 : checkpointing enabled
When checkpointing is enabled, the data is successfully ingested to the mentioned s3 path.

case 2 : checkpointing disabled
When checkpointing is disabled, the data is not written to s3.
I tried executing the job multiple times, but every time I got the same result. I am facing this on local machine as well as on kubernetes cluster.

object FlinkTestJob {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // with checkpointing enabled
    env.enableCheckpointing(100)

    // Sinks
    val streamStrings: Seq[String] =
      Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10")

    val testStream = env.fromCollection(streamStrings)

    val rollingPolicy = new RollingPolicy[String, String] {

      override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean =
        partFileState.getSize > 1

      override def shouldRollOnEvent(
          partFileState: PartFileInfo[String],
          element: String): Boolean = true

      override def shouldRollOnProcessingTime(
          partFileState: PartFileInfo[String],
          currentTime: Long): Boolean = true
    }

    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("s3a://testbucket/sink"), new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(rollingPolicy)
      .build()

    testStream.addSink(sink)
    env.execute("test-job")
  }
}

When I write to s3 using "writeAsText("s3a://testbucket/sink")" instead of StreamingFileSink, it works perfectly fine regardless of whether or not checkpointing is enabled.

Flink version : 1.8.0
I want to understand the relation between checkpointing and StreamingFileSink.
Thanks

1

1 Answers

1
votes

for StreamingFileSink to work checkpointing needs to be enabled.