I've the following doc.
And there are mentioned that:
1.1. Multi-threaded Step The simplest way to start parallel processing is to add a TaskExecutor to your Step configuration.
When using java configuration, a TaskExecutor can be added to the step as shown in the following example:
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
The result of the above configuration is that the Step executes by reading, processing, and writing each chunk of items (each commit interval) in a separate thread of execution. Note that this means there is no fixed order for the items to be processed, and a chunk might contain items that are non-consecutive compared to the single-threaded case. In addition to any limits placed by the task executor (such as whether it is backed by a thread pool), there is a throttle limit in the tasklet configuration which defaults to 4. You may need to increase this to ensure that a thread pool is fully utilized.
But before I thought that it should be achieved by local partitioning and I should provide a partitioner which say how to divide data into pieces. Multi-threaded Step should do it automatically.
Question
Could you explain how does it work ? How can I manage it besides the thread number? Will it work for flat file?
P.S.
I created the example:
@Configuration
public class MultithreadedStepConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private ToLowerCasePersonProcessor toLowerCasePersonProcessor;
@Autowired
private DbPersonWriter dbPersonWriter;
@Value("${app.single-file}")
Resource resources;
@Bean
public Job job(Step databaseToDataBaseLowercaseSlaveStep) {
return jobBuilderFactory.get("myMultiThreadedJob")
.incrementer(new RunIdIncrementer())
.flow(csvToDataBaseSlaveStep())
.end()
.build();
}
private Step csvToDataBaseSlaveStep() {
return stepBuilderFactory.get("csvToDatabaseStep")
.<Person, Person>chunk(50)
.reader(csvPersonReaderMulti())
.processor(toLowerCasePersonProcessor)
.writer(dbPersonWriter)
.taskExecutor(jobTaskExecutorMultiThreaded())
.build();
}
@Bean
@StepScope
public FlatFileItemReader csvPersonReaderMulti() {
return new FlatFileItemReaderBuilder()
.name("csvPersonReaderSplitted")
.resource(resources)
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.saveState(false)
.build();
}
@Bean
public TaskExecutor jobTaskExecutorMultiThreaded() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// there are 21 sites currently hence we have 21 threads
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(25);
taskExecutor.setThreadGroupName("multi-");
taskExecutor.setThreadNamePrefix("multi-");
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
And it really works according the log but I want to know details. Is it better than self written partitioner ?