3
votes

I have a Spring Batch application (3.0.7), started via Spring Boot which reads a number of XML files in parallel, process them, and "spit outs" INSERT or UPDATE statements against an Oracle DB.

In order to process the files in parallel, I'm using a Partitioner. The job works fine, except for the JdbcWriter which seems to be bound only to one thread. Since I'm using a ThreadPoolTaskExecutor, I expected that the Step could run in parallel for the reader, processor and writer. But, it seems that JdbcWriter is always bound to Thread-1 (I can see that in the logs but also analyzing the database connections, only one connection is active - note that my Datasource is configured to use a pool with 20 connections).

I have annotated the reader, processor and writer as @StepScope. How can I effectively use all the configured threads from the taskExecutor to read AND WRITE in parallel?

This is an extract from my configuration:

@Bean
public Job parallelJob() throws Exception {
    return jobBuilderFactory.get("parallelJob")
            .start(splitFileStep())
            .next(recordPartitionStep())
            .build();
}

@Bean
public Step recordPartitionStep() {
    return stepBuilderFactory.get("factiva-recordPartitionStep")
            .partitioner(recordStep())
            .partitioner("recordStep", recordPartitioner(null)) <!-- this is used to inject some data from the job context
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Step recordStep() {
    return stepBuilderFactory.get("recordStep")
            .<Object, StatementHolderMap>chunk(1000)
            .reader(recordReader(null)) <!-- this is used to inject some data from the job context
            .processor(recordProcessor) <!-- this is @Autowired, and the bean is marked as @StepScope
            .writer(jdbcItemWriter())
            .build();
}

@Bean
@StepScope
public ItemStreamReader recordReader(@Value("#{stepExecutionContext['record-file']}") Resource resource) {
    // THIS IS A StaxEventItemReader
}

@Bean
@StepScope
public JdbcItemWriter jdbcItemWriter() {

    JdbcItemWriter jdbcItemWriter = new JdbcItemWriter();
    jdbcItemWriter.setDataSource(dataSource);
    ...
    return jdbcItemWriter;
}

@Value("${etl.factiva.partition.cores}")
private int threadPoolSize;

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    if (threadPoolSize == 0) {
        threadPoolSize = Runtime.getRuntime().availableProcessors();
    }
    taskExecutor.setMaxPoolSize(threadPoolSize);
    taskExecutor.afterPropertiesSet();

    return taskExecutor;
}
1
Check out example github.com/spring-projects/spring-batch/blob/master/… this example reads all the file from directory and processes the file parallely - Karthik Prasad
Have you confirmed that the TaskExecutor is being configured with more than one thread? Also, how is your DataSource configured? Finally, how have you proven that you're only getting a single thread of execution? - Michael Minella
Hi Michael, the process is running single-threaded. The log statements from the writer or reader are always bound to [thread-1]. I can also see that only one connection (out of 20) from the connection pool is used to execute SQL statements. - Luciano

1 Answers

5
votes

I figured out why Spring Batch wasn't using all the configured threads.

First, the Spring configuration for the Partitioner was wrong. The original configuration did not set the gridSize value and was incorrectly referencing the step to run in partitions.

Second, the ThreadPoolTaskExecutor used in the original configuration doesn't seem to work. Switching to SimpleAsyncTaskExecutor did the trick.

I'm still not sure why the ThreadPoolTaskExecutor didn't work. The javadoc for SimpleAsyncTaskExecutor actually reccommends using a pool to reuse threads.

I'm also not 100% sure I fully understand the implications of setting the gridSize value. Currently, I'm setting the gridSize to a value which is equal to the numbers of threads used in the partitioned step. It would be great, if someone could comment on this approach @Michael Minella? :)

This is the correct configuration, for reference.

@Bean
public Job parallelJob() throws Exception {
    return jobBuilderFactory.get("parallelJob")
            .start(splitFileStep())
            .next(recordPartitionStep())
            .build();
}

@Bean
public Step recordPartitionStep() {
    return stepBuilderFactory.get("factiva-recordPartitionStep")
            .partitioner(recordStep().getName(), recordPartitioner(null)) <!-- the value for the recordPartitioner constructor is injected at runtime
            .step(recordStep())
            .gridSize(determineWorkerThreads()) <!-- GRID SIZE VALUE MUST BE EQUAL TO THE NUMBER OF THREAD CONFIGURED FOR THE THREAD POOL
            .taskExecutor(taskExecutor())
            .build();


}

@Bean
public Step recordStep() {
    return stepBuilderFactory.get("recordStep")
            .<Object, StatementHolderMap>chunk(1000)
            .reader(recordReader(null)) <!-- this is used to inject some data from the job context
            .processor(recordProcessor) <!-- this is @Autowired, and the bean is marked as @StepScope
            .writer(jdbcItemWriter())
            .build();
}

@Bean
@StepScope
public ItemStreamReader recordReader(@Value("#{stepExecutionContext['record-file']}") Resource resource) {
    // THIS IS A StaxEventItemReader
}

@Bean
@StepScope
public JdbcItemWriter jdbcItemWriter() {

    JdbcItemWriter jdbcItemWriter = new JdbcItemWriter();
    jdbcItemWriter.setDataSource(dataSource);
    ...
    return jdbcItemWriter;
}

@Value("${etl.factiva.partition.cores}")
private int threadPoolSize;

@Bean
public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");

    taskExecutor.setConcurrencyLimit(determineWorkerThreads());
    return taskExecutor;
}

// threadPoolSize is a configuration parameter for the job
private int determineWorkerThreads() {
    if (threadPoolSize == 0) {
        threadPoolSize = Runtime.getRuntime().availableProcessors();
    }
    return threadPoolSize;

}