1
votes

I have a job running, and I'm interested in to use only one recover retry, because in the meantime that this flink restart is not triggered I have a Thread that try to solve the problem, then when the problem was solved flink will restart, but sometimes the thread takes longer the usual to fix the issue and restart strategy is triggered, failing because of the issue still, then the job is stopped but the thread maybe has another iteration, and then the application never dies because I'm running it as a jar application. So, my question:

  • Is there anyway to know from java code the status of the job? Something like (JobStatus.CANCELED == true).

Thanks in advance! Kind regards

1
there is an open issue to create a JobListener for Flink. I guess it is what you want: issues.apache.org/jira/browse/FLINK-12214 . The issue is still open but there is already some code implemented (github.com/apache/flink/blob/master/flink-core/src/main/java/…). Maybe you can try to use it env.registerJobListener();Felipe
exactly that is what I want, I will try your ideas. thanks a lot!Alejandro Deulofeu

1 Answers

2
votes

Thanks a lot Felipe. This is what I was needing and thanks to you it is done. I share the code here in case of someone else needed.

  1. Prepare the listener

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(...);
    final AtomicReference<JobID> jobIdReference = new AtomicReference<>();
    //Environment configurations
    env.registerJobListener(new JobListener() {
        @Override
        public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
            assert jobClient != null;
            jobIdReference.set(jobClient.getJobID());
            jobClient = jobClient /*jobClient static public object in the main class*/;
        }@Override
        public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
            assert jobExecutionResult != null;
            jobExecutionResult.notify();
        }
    });
    
  2. Use the code:

    Preconditions.checkNotNull(jobClient);
    final String status = jobClient.getJobStatus().get().name();
    if (status.equals(JobStatus.FAILED.name())) System.exit(1);