2
votes

I am planning to migrate one of our Spark application to Apache Flink. I am trying to understand its fault tolerance feature.

I executed following code, I do not see that Flink actually tries to retry any task(or subtask). This can cause a data loss for me. What should I do to make sure that every failures covered by Flink?

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("file:///my-path", false))
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      3, // number of restart attempts
      Time.of(0, TimeUnit.SECONDS) // delay
    ))
    env.enableCheckpointing(10L)
    val text = env.socketTextStream(hostName, port)
    text
      .map { input =>
        List(input)
      }.setParallelism(1)
      .flatMap { input =>
        println(s"running for $input")
        List.fill(5)(input.head) ::: {
          println("throw exception here")
          throw new RuntimeException("some exception")
          List("c")
        }
      }

I expect to see throw exception here message couple of times on the screen. However, when I use fixedDelayRestart, it looks like it just ignores this message and continue for others.

1

1 Answers

1
votes

It depends how you start the application.

I assume that you are running this out of your IDE. In that case StreamExecutionEnvironment.getExecutionEnvironment returns a LocalStreamExecutionEnvironment which runs the program and all of Flink in a single process, i.e., master (in Flink JobManager) and worker (TaskManager) are started as threads in the same JVM process. The exception terminates this single process. Hence, there is no Flink process left that could restart the program.

If you want to run the program with fault tolerance, you need to submit it to a Flink environment, for example one that runs on your local machine. Download the Flink distribution, extract the archive file, and run ./bin/start-cluster.sh. This will start two processes, a master and a worker process. You can then submit the program to the cluster by creating a remote execution environment with StreamExecutionEnvironment.createRemoteEnvironment and passing hostname and port as parameters (please check the documentation for details).

Note that the exception will still kill the worker process. So in order to be able to restart the program, you'll need to manually start a worker process. In a production environment, this is typically taken care of by Kubernetes, Yarn, or Mesos.

By the way, we recently added an operations playground to the Flink documentation. It's a Docker-based sandbox environment to play around with Flink's fault-tolerance features. I recommend to check it out: Flink Operations Playground.

Some more hints:

  • A checkpointing interval of 10ms is very short.
  • A text socket source does not provide at-least-once (or exactly-once) guarantees. Records are processed at-most-once.