I have a Spring Batch application (3.0.7), started via Spring Boot which reads a number of XML files in parallel, process them, and "spit outs" INSERT or UPDATE statements against an Oracle DB.
In order to process the files in parallel, I'm using a Partitioner
. The job works fine, except for the JdbcWriter
which seems to be bound only to one thread. Since I'm using a ThreadPoolTaskExecutor
, I expected that the Step could run in parallel for the reader, processor and writer. But, it seems that JdbcWriter is always bound to Thread-1
(I can see that in the logs but also analyzing the database connections, only one connection is active - note that my Datasource is configured to use a pool with 20 connections).
I have annotated the reader, processor and writer as @StepScope. How can I effectively use all the configured threads from the taskExecutor
to read AND WRITE in parallel?
This is an extract from my configuration:
@Bean
public Job parallelJob() throws Exception {
return jobBuilderFactory.get("parallelJob")
.start(splitFileStep())
.next(recordPartitionStep())
.build();
}
@Bean
public Step recordPartitionStep() {
return stepBuilderFactory.get("factiva-recordPartitionStep")
.partitioner(recordStep())
.partitioner("recordStep", recordPartitioner(null)) <!-- this is used to inject some data from the job context
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step recordStep() {
return stepBuilderFactory.get("recordStep")
.<Object, StatementHolderMap>chunk(1000)
.reader(recordReader(null)) <!-- this is used to inject some data from the job context
.processor(recordProcessor) <!-- this is @Autowired, and the bean is marked as @StepScope
.writer(jdbcItemWriter())
.build();
}
@Bean
@StepScope
public ItemStreamReader recordReader(@Value("#{stepExecutionContext['record-file']}") Resource resource) {
// THIS IS A StaxEventItemReader
}
@Bean
@StepScope
public JdbcItemWriter jdbcItemWriter() {
JdbcItemWriter jdbcItemWriter = new JdbcItemWriter();
jdbcItemWriter.setDataSource(dataSource);
...
return jdbcItemWriter;
}
@Value("${etl.factiva.partition.cores}")
private int threadPoolSize;
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
if (threadPoolSize == 0) {
threadPoolSize = Runtime.getRuntime().availableProcessors();
}
taskExecutor.setMaxPoolSize(threadPoolSize);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
TaskExecutor
is being configured with more than one thread? Also, how is yourDataSource
configured? Finally, how have you proven that you're only getting a single thread of execution? - Michael Minella