I have a spring boot/integration/batch which will run and poll files on SFTP.
I'd like to be able to eventually relaunch a job (could be because application has been restarted or because for some reasons we received the same file again) with the same parameters (same file basically) using RunIdIncrementer defined on Job's Configuration.
Unfortunately run.id=1 doesn't increment and I get a JobInstanceAlreadyCompleteException
JOB configuration
@Autowired
private JobBuilderFactory jobBuilders;
@Bean
public Job importOffersJob() {
Job job = jobBuilders.get("importOffersJob")
.start(importOffersStep)
.listener(traceJobExecutionListener())
.incrementer(new RunIdIncrementer())
.build();
return job;
}
Integration
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\\.xml.mini$")
.deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles())
.preserveTimestamp(Boolean.TRUE)
.autoCreateLocalDirectory(Boolean.TRUE)
.remoteDirectory(intCfg.getSftpRemoteDirectory())
.localDirectory(new File(intCfg.getSftpLocalDirectory())
),
e -> e.id("sftpInboundAdapter")
.poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1)))
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw())
.handle(logger())
.get();
}
public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> {
private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class);
@Autowired
@Qualifier("importOffersJob")
private Job job;
@Override
public JobLaunchRequest transform(Message<File> source) {
log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload()
.getAbsolutePath());
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath());
//jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());
JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters());
return new JobLaunchRequest(job, jobParams);
}
}
If I uncomment jobParametersBuilder.addString("UUID", UUID.randomUUID().toString()); It's is working but I think the point is to be able to reuse the incrementer defined in my job configuration isn't it ?
(When I was running the same batch as a simple spring boot without integration the incrementer was working)
many thanks