I am planning to migrate one of our Spark application to Apache Flink. I am trying to understand its fault tolerance feature.
I executed following code, I do not see that Flink actually tries to retry any task(or subtask). This can cause a data loss for me. What should I do to make sure that every failures covered by Flink?
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("file:///my-path", false))
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(0, TimeUnit.SECONDS) // delay
))
env.enableCheckpointing(10L)
val text = env.socketTextStream(hostName, port)
text
.map { input =>
List(input)
}.setParallelism(1)
.flatMap { input =>
println(s"running for $input")
List.fill(5)(input.head) ::: {
println("throw exception here")
throw new RuntimeException("some exception")
List("c")
}
}
I expect to see throw exception here
message couple of times on the screen. However, when I use fixedDelayRestart
, it looks like it just ignores this message and continue for others.