Just to recap what was actually done based on the advice provided by incomplete-co.de.
I created a recovery flow which is similar to the one below. The recovery flow wraps my actual batch and responsible only to serve the correct job parameters to the internal job. It could be initial parameters on first execution, new parameters on normal execution or old parameters in case the last execution had failed.
<batch:job id="recoveryWrapper"
incrementer="wrapperRunIdIncrementer"
restartable="true">
<batch:decision id="recoveryFlowDecision" decider="recoveryFlowDecider">
<batch:next on="FIRST_RUN" to="defineParametersOnFirstRun" />
<batch:next on="RECOVER" to="recover.batchJob " />
<batch:next on="CURRENT" to="current.batchJob " />
</batch:decision>
<batch:step id="defineParametersOnFirstRun" next="current.batchJob">
<batch:tasklet ref="defineParametersOnFirstRunTasklet"/>
</batch:step>
<batch:step id="recover.batchJob " next="current.batchJob">
<batch:job ref="batchJob" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor" />
</batch:step>
<batch:step id="current.batchJob" >
<batch:job ref="batchJob" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor" />
</batch:step>
</batch:job>
The heart of the solution is the RecoveryFlowDecider the JobParametersExtractor while using Spring Batch Restart mechanism.
RecoveryFlowDecider will query the JobExplorer and JobRepository to find out if we had a failure in the last run. It will place The last execution on the execution context of the wrapper to use later in the JobParametersExtractor.
Note the use of runIdIncremeter to allow re-execution of the wrapper job.
@Component
public class RecoveryFlowDecider implements JobExecutionDecider {
private static final String FIRST_RUN = "FIRST_RUN";
private static final String CURRENT = "CURRENT";
private static final String RECOVER = "RECOVER";
@Autowired
private JobExplorer jobExplorer;
@Autowired
private JobRepository jobRepository;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution
,StepExecution stepExecution) {
// the wrapper is named as the wrapped job + WRAPPER
String wrapperJobName = jobExecution.getJobInstance().getJobName();
String jobName;
jobName = wrapperJobName.substring(0,wrapperJobName.indexOf(EtlConstants.WRAPPER));
List<JobInstance> instances = jobExplorer.getJobInstances(jobName, 0, 1);
JobInstance internalJobInstance = instances.size() > 0 ? instances.get(0) : null;
if (null == internalJobInstance) {
return new FlowExecutionStatus(FIRST_RUN);
}
JobExecution lastExecution = jobRepository.getLastJobExecution(internalJobInstance.getJobName()
,internalJobInstance.getJobParameters());
//place the last execution on the context (wrapper context to use later)
jobExecution.getExecutionContext().put(EtlConstants.LAST_EXECUTION, lastExecution);
ExitStatus exitStatus = lastExecution.getExitStatus();
if (ExitStatus.FAILED.equals(exitStatus) || ExitStatus.UNKNOWN.equals(exitStatus)) {
return new FlowExecutionStatus(RECOVER);
}else if(ExitStatus.COMPLETED.equals(exitStatus)){
return new FlowExecutionStatus(CURRENT);
}
//We should never get here unless we have a defect
throw new RuntimeException("Unexpecded batch status: "+exitStatus+" in decider!");
}
}
Then the JobParametersExtractor will test again for the outcome of the last execution, in case of failed job it will serve the original parameters used to execute the failed job triggering Spring Bacth restart mechanism. Otherwise it will create a new set of parameters and will execute at his normal course.
@Component
public class JobExecutionWindowParametersExtractor implements
JobParametersExtractor {
@Override
public JobParameters getJobParameters(Job job, StepExecution stepExecution) {
// Read the last execution from the wrapping job
// in order to build Next Execution Window
JobExecution lastExecution= (JobExecution) stepExecution.getJobExecution().getExecutionContext().get(EtlConstants.LAST_EXECUTION);;
if(null!=lastExecution){
if (ExitStatus.FAILED.equals(lastExecution.getExitStatus())) {
JobInstance instance = lastExecution.getJobInstance();
JobParameters parameters = instance.getJobParameters();
return parameters;
}
}
//We do not have failed execution or have no execution at all we need to create a new execution window
return buildJobParamaters(lastExecution,stepExecution);
}
...
}