1
votes

I've a working spring boot/batch projet containing 2 jobs.

I'm now trying to add Integration to poll files from a remote SFTP using only java configuration / java DSL, and then launch a job.

The file polling is working but I've no idea on how to launch a Job in my flow, despite reading these links :

Spring Batch Integration config using Java DSL

and

Spring Batch Integration job-launching-gateway

some code snippets:

@Bean
public SessionFactory SftpSessionFactory()
    {
        DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
        sftpSessionFactory.setHost("myip");
        sftpSessionFactory.setPort(22);
        sftpSessionFactory.setUser("user");
        sftpSessionFactory.setPrivateKey(new FileSystemResource("path to my key"));

    return sftpSessionFactory;
}

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
        .from(Sftp.inboundAdapter(SftpSessionFactory())
            .deleteRemoteFiles(Boolean.FALSE)
            .preserveTimestamp(Boolean.TRUE)
            .autoCreateLocalDirectory(Boolean.TRUE)
            .remoteDirectory("remote dir")
            .regexFilter(".*\\.txt$")
            .localDirectory(new File("C:/sftp/")),
                e -> e.id("sftpInboundAdapter").poller(Pollers.fixedRate(600000)))
        .handle("FileMessageToJobRequest","toRequest")
        // what to put next to process the jobRequest ?

For .handle("FileMessageToJobRequest","toRequest") I use the one described here http://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html

I would appreciate any help on that, many thanks.

EDIT after Gary comment I've added, it doesn't compile -of course- because I don't understand how the request is propagated :

.handle("FileMessageToJobRequest","toRequest")
.handle(jobLaunchingGw())
.get();
}

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher());
}

@Autowired
private JobLauncher jobLauncher;

@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
    JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
    return execution;
}

I've found a way to launch a job using a @ServiceActivator and adding this to my flow but I'm not sure it's good practice :

 .handle("lauchBatchService", "launch")

@Component("lauchBatchService")
public class LaunchBatchService {
    private static Logger log = LoggerFactory.getLogger(LaunchBatchService.class);

@Autowired
private JobLauncher jobLauncher;

@ServiceActivator
public JobExecution launch(JobLaunchRequest req) throws JobExecutionException {

    JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());

    return execution;
}

}
1

1 Answers

1
votes
    .handle(jobLaunchingGw())
    // handle result

...

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher());
}

where jobLauncher() is the JobLauncher bean.

EDIT

Your service activator is doing about the same as the JLG; it uses this code.

Your jobLauncher @Bean is wrong.

@Beans are definitions; they don't do runtime stuff like this

@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
    JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
    return execution;
}

Since you are already autowiring a JobLauncher, just use that.

@Autowired
private JobLauncher jobLauncher;

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher);   
}