1
votes

How to trigger a spring batch job from a spring integration, using java dsl Integrationflows.

I have the below code which polls for a file in a directory , the moment a new file is added to the directory , a message is generated , i want to trigger a spring batch job at that instance. Please advice.

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .transform(Transformers.fileToString())
                     .channel(ApplicationConfiguration.INBOUND_CHANNEL)                 

             .get();
}
2

2 Answers

0
votes

There is clean sample in the Spring Batch Reference Manual on the matter:

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            handle(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}
0
votes

The below is my code:-

@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                              @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
                                              TaskExecutor taskExecutor,
                                              MessageSource<File> fileReadingMessageSource,
                                              JobLaunchingGateway jobLaunchingGateway) {

    return IntegrationFlows.from(fileReadingMessageSource,
            c -> c.poller(Pollers.fixedDelay(period)
                    .taskExecutor(taskExecutor)
                    .maxMessagesPerPoll(maxMessagesPerPoll)))
              .handle(fileMessageToJobRequest(),"toRequest")
                       .handle(jobLaunchingGateway)
                        .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                             .get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
  //  fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
  //  simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}