0
votes

I am trying to upload multiple files to SFTP server using Spring batch with integration. Multiple files are uploaded parallelly using Future with threadPoolExecutorService. But I want to execute the tasklet in parallel with spring batch config [Not the way I did now with future tasks] as well as suppose if file upload fails, I want to retry the file uploading process for certain interval.

@Autowired
UploadGateway gateway;

@Bean
public Job importDataJob() {
    return jobBuilderFactory.get(FILE_UPLOAD_JOB_NAME).listener(jobExecutionListener(threadPoolTaskExecutor()))
            .incrementer(new RunIdIncrementer()).flow(uploadFiles())
            .end().build();
}


@Bean
public Step uploadFiles() {
    return stepBuilderFactory.get(UPLOAD_FILE_STEP_NAME).tasklet(new Tasklet() {

        @Override
        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
            log.info("Upload tasklet start executing..");

                resources = resourcePatternResolver.getResources(quantumRuntimeProperties.getInputFilePaths());

                for (Resource anInputResource : resources) {
                    log.info("Incoming file <{}> to upload....", anInputResource.getFilename());

                    Future<?> submit = threadPoolTaskExecutor().submit(new Callable<Boolean>() {

                        @Override
                        public Boolean call() throws Exception {
                            try {

                                gateway.upload(anInputResource.getFilename());

                            } catch (Exception e) {

                            }
                            return true;
                        }
                    });
                    resultList.add(submit);
                }

            return RepeatStatus.FINISHED;
        }
    }).build();
}

@Bean
public JobExecutionListener jobExecutionListener(ThreadPoolTaskExecutor executor) {
    return new JobExecutionListener() {
        private ThreadPoolTaskExecutor taskExecutor = executor;

        @Override
        public void beforeJob(JobExecution jobExecution) {
            // DO-NOTHING
        }

        @Override
        public void afterJob(JobExecution jobExecution) {

            for (Future<?> future : resultList) {
                try {
                    // Wait for all the file uploads to complete
                    future.get();
                } catch (InterruptedException | ExecutionException e) {
                    log.error("Error occured while waiting for all files to get uploaded...");
                }

            }
            taskExecutor.shutdown();
        }
    };
}
1

1 Answers

0
votes

I would suggest you to take a look into the ChunkMessageChannelItemWriter and RemoteChunkHandlerFactoryBean - the integration of Spring Batch with Spring Integration. See Reference Manual for more information.