I wrote a flink batch job in flink 1.11.1. After job finishes successfull y, I want to do something like calling a http service.
I added a simple job listener to hook job status. The problem is when when kafka sink operator throws a error, job listener does not triggered. I expect when my job failed, it should trigger my job listener and print fail log.
How can I be sure that the job is done successfully or not?
Any help will be appreciated.
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
}
override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
}
})
env.createInput(input)
.filter(r => Option(r.token).getOrElse("").nonEmpty)
.addSink(kafkaProducer)