4
votes

I've the following doc.

And there are mentioned that:

1.1. Multi-threaded Step The simplest way to start parallel processing is to add a TaskExecutor to your Step configuration.

When using java configuration, a TaskExecutor can be added to the step as shown in the following example:

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .build();
}

The result of the above configuration is that the Step executes by reading, processing, and writing each chunk of items (each commit interval) in a separate thread of execution. Note that this means there is no fixed order for the items to be processed, and a chunk might contain items that are non-consecutive compared to the single-threaded case. In addition to any limits placed by the task executor (such as whether it is backed by a thread pool), there is a throttle limit in the tasklet configuration which defaults to 4. You may need to increase this to ensure that a thread pool is fully utilized.

But before I thought that it should be achieved by local partitioning and I should provide a partitioner which say how to divide data into pieces. Multi-threaded Step should do it automatically.

Question

Could you explain how does it work ? How can I manage it besides the thread number? Will it work for flat file?

P.S.

I created the example:

@Configuration
public class MultithreadedStepConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ToLowerCasePersonProcessor toLowerCasePersonProcessor;

    @Autowired
    private DbPersonWriter dbPersonWriter;

    @Value("${app.single-file}")
    Resource resources;

    @Bean
    public Job job(Step databaseToDataBaseLowercaseSlaveStep) {
        return jobBuilderFactory.get("myMultiThreadedJob")
                .incrementer(new RunIdIncrementer())
                .flow(csvToDataBaseSlaveStep())
                .end()
                .build();
    }

    private Step csvToDataBaseSlaveStep() {
        return stepBuilderFactory.get("csvToDatabaseStep")
                .<Person, Person>chunk(50)
                .reader(csvPersonReaderMulti())
                .processor(toLowerCasePersonProcessor)
                .writer(dbPersonWriter)
                .taskExecutor(jobTaskExecutorMultiThreaded())
                .build();

    }

    @Bean
    @StepScope
    public FlatFileItemReader csvPersonReaderMulti() {
        return new FlatFileItemReaderBuilder()
                .name("csvPersonReaderSplitted")
                .resource(resources)
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .saveState(false)
                .build();

    }

    @Bean
    public TaskExecutor jobTaskExecutorMultiThreaded() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // there are 21 sites currently hence we have 21 threads
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setCorePoolSize(25);
        taskExecutor.setThreadGroupName("multi-");
        taskExecutor.setThreadNamePrefix("multi-");
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
}

And it really works according the log but I want to know details. Is it better than self written partitioner ?

1

1 Answers

1
votes

There is basically fundamental differences here when you use multi-threaded steps and partitions.

Multi-threaded steps are single process so it is not a good idea to use this if you have persisted state for processor/writer. However say if you just generating a report without saving anything this is a good choice.

As you have mentioned say you want to process a flat file and say you want store the records in DB then you can use remote chunking concept assuming your reader is not heavy.

Partitioner will create separate process for a each set of data you can use logically divide.

Hope this helps.