8
votes

I am new to Spring batch and couldn't figure out how to do this..

Basically I have a spring file poller which runs every N mins to look for files with some name (ex: A.txt & B.txt) in certain directory. At any moment in time, there could be max 2 files in this directory (A and B). Through Spring Batch Job, these two files will be processed and persisted to 2 different DB tables.

These files are somewhat similar, so the same processor/writer is used.

Right now the way I set up, every polling cycle 1 file is picked up and job is ran.

Let's say there are 2 files in the directory (A.txt and B.txt), is there a way to create 2 jobs so that both jobs can be run in parallel?

2

2 Answers

13
votes

There are very good approaches in order to run jobs in async mode with Spring, it is just a matter of how is configured the JobLauncher. The JobLauncher has a taskExecutor property and the asynchronous execution could be activated depending on the implementation that is assigned to that property.

You can find all the TaskExecutor types that Spring can provide and depending on your needs select the best approach to accomplish your batch asynchronous jobs. Task Executors Types in Spring

For example SimpleAsyncTaskExecutor is a task executor that will create a new Thread on any invocation and that could generate a performance issue if the execution runs with high frequency. In the other hand there are also TaskExecutors types that provides pooling features in order to reuse resources and maximize the efficiency of the system.

Here is an small example of how configure a ThreadPoolTaskExecutor:

A) Configure ThreadPoolTaskExecutor Bean

@Bean
    public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(15);
    taskExecutor.setMaxPoolSize(20);
    taskExecutor.setQueueCapacity(30);
    return taskExecutor;
}

B) Configure JobLauncher Bean

   @Bean
    public JobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, JobRepository jobRepository){
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setTaskExecutor(taskExecutor);
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }

C) Inject your JobLauncher and your Jobs configuration

@Autowired
private JobLauncher jobLauncher;

@Autowired
@Qualifier("job1-file-A")
private Job job1;

@Autowired
@Qualifier("job2-file-B")
private Job job2;

D) Schedule the jobs

@Scheduled(cron = "*/1 * * * * *")
public void run1(){
    Map<String, JobParameter> confMap = new HashMap<>();
    confMap.put("time", new JobParameter(System.currentTimeMillis()));
    JobParameters jobParameters = new JobParameters(confMap);
    try {
        jobLauncher.run(job1, jobParameters);
    }catch (Exception ex){
        logger.error(ex.getMessage());
    }

}

@Scheduled(cron = "*/1 * * * * *")
public void run2(){
    Map<String, JobParameter> confMap = new HashMap<>();
    confMap.put("time", new JobParameter(System.currentTimeMillis()));
    JobParameters jobParameters = new JobParameters(confMap);
    try {
        jobLauncher.run(job2, jobParameters);
    }catch (Exception ex){
        logger.error(ex.getMessage());
    }

}

E) Finally on your SpringBoot Class @EnableBatchProcessing and @EnableScheduling

@EnableBatchProcessing
@EnableScheduling
@SpringBootApplication
public class MyBatchApp {
12
votes

I believe that you can. Since you are new in spring batch (just like me) I would recommend that you go through the domain language of a batch if you haven't done so already.

Then you may start by configuring your own asynchronous JobLauncher. For example:

  @Bean
  public JobLauncher jobLauncher() throws Exception
  {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();

    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();

    return jobLauncher;
  }

Pay special attention to SimpleAsyncTaskExecutor (the job repo can be autowired). This configuration will allow asynchronous execution as visualized next:

Asynchronous

Compare it with the synchronous execution flow:

enter image description here

Maybe it would additionally help to quote the SimpleJobLauncher java doc:

Simple implementation of the JobLauncher interface. The Spring Core TaskExecutor interface is used to launch a Job. This means that the type of executor set is very important. If a SyncTaskExecutor is used, then the job will be processed within the same thread that called the launcher. Care should be taken to ensure any users of this class understand fully whether or not the implementation of TaskExecutor used will start tasks synchronously or asynchronously. The default setting uses a synchronous task executor.

More details and configuration options - here.

At the end just create the jobs with different names and/or launch them with different parameter set. Naive example would be:

  @Autowired
  public JobBuilderFactory jobBuilderFactory;

  public Job createJobA() {
    return jobBuilderFactory.get("A.txt")
                            .incrementer(new RunIdIncrementer())
                            .flow(step1())
                            .next(step2())
                            .end()
                            .build();
  }

  public Job createJobB() {
    return jobBuilderFactory.get("B.txt")
                            .incrementer(new RunIdIncrementer())
                            .flow(step1())
                            .next(step2())
                            .end()
                            .build();
  }

Executing these jobs with your asynchronous job launcher will create two job instances that which will execute in parallel. This is just one option, that may or may not be suitable for your context.