0
votes

I wrote a flink batch job in flink 1.11.1. After job finishes successfull y, I want to do something like calling a http service.

I added a simple job listener to hook job status. The problem is when when kafka sink operator throws a error, job listener does not triggered. I expect when my job failed, it should trigger my job listener and print fail log.

How can I be sure that the job is done successfully or not?

Any help will be appreciated.

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
      override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }

      override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })

    env.createInput(input)
      .filter(r => Option(r.token).getOrElse("").nonEmpty)
      .addSink(kafkaProducer)
1

1 Answers

0
votes

If you try to run the job on cluster, you can view your logger message and stdout in the console with your job id. Please refer the attached screenshot,

The default url could be http://localhost:8081 if your run on local cluster.

Again, the below is not correct approach to check your job is success or not.

if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }

enter image description here