I have this configuration to run spring batch, but parallel processing is getting hang after some time and no trace on console. Earlier I used JdbcCursorItemReader and then switch to JdbcPagingItemReader for thread safe reader. Reader is reading entries from postgres DB and then processor (which calls other app and return response to writer) and writer (which creates file and update data in DB) can execute parallely.
@Bean
public Job job(JobBuilderFactory jobBuilderFactory,
StepBuilderFactory stepBuilderFactory,
ItemReader<OrderRequest> itemReader,
ItemProcessor<OrderRequest, OrderResponse> itemProcessor,
ItemWriter<OrderResponse> itemWriter, JobExecutionListener jobListener,
ItemReadListener<OrderRequest> stepItemReadListener,
SkipListener<OrderRequest, OrderResponse> stepSkipListener, TaskExecutor taskExecutor) {
Step step1 = stepBuilderFactory.get("Process-Data")
.<OrderRequest, OrderResponse>chunk(10)
.listener(stepItemReadListener)
.reader(itemReader)
.processor(itemProcessor)
.writer(itemWriter)
.faultTolerant()
.processorNonTransactional()
.skipLimit(5)
.skip(SkipBatchException.class)
.skip(VuosioteException.class)
.listener(stepSkipListener)
.taskExecutor(taskExecutor)
.throttleLimit(5)
.build();
return jobBuilderFactory.get("Batch-Job")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.start(step1)
.build();
}
@StepScope
@Bean
public JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
@Value("#{jobParameters[batchId]}") String batchId, OrderRequestMapper rowMapper) {
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<OrderRequest> reader = new JdbcPagingItemReader<>();
reader.setDataSource(dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(rowMapper);
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("OrderRequestID", Order.ASCENDING);
// Postgres implementation of a PagingQueryProvider using database specific features.
PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
queryProvider.setSelectClause("*");
queryProvider.setFromClause("FROM OrderRequest");
queryProvider.setWhereClause("CUSTOMER = '" + batchId + "'");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@StepScope
@Bean
public SynchronizedItemStreamReader<OrderRequest> itemReader(JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader) {
return new SynchronizedItemStreamReaderBuilder<OrderRequest>().delegate(jdbcPagingItemReader).build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(5);
taskExecutor.setQueueCapacity(0);
return taskExecutor;
}
@StepScope
@Bean
ItemProcessor<OrderRequest, OrderResponse> dataProcessor() {
return new BatchDataFileProcessor();
}
@StepScope
@Bean
ItemWriter<OrderResponse> batchFileWriter() {
return new BatchFileWriter();
}
@StepScope
@Bean
public ItemReadListener<OrderRequest> stepItemReadListener() {
return new StepItemReadListener();
}
@Bean
public JobExecutionListener jobListener() {
return new JobListener();
}
@StepScope
@Bean
public SkipListener<OrderRequest, OrderResponse> stepSkipListener() {
return new StepSkipListener();
}
```
What is problem with configuration here?
Batch works fine with single record at a time.