0
votes

I have this configuration to run spring batch, but parallel processing is getting hang after some time and no trace on console. Earlier I used JdbcCursorItemReader and then switch to JdbcPagingItemReader for thread safe reader. Reader is reading entries from postgres DB and then processor (which calls other app and return response to writer) and writer (which creates file and update data in DB) can execute parallely.

    @Bean
  public Job job(JobBuilderFactory jobBuilderFactory,
      StepBuilderFactory stepBuilderFactory,
      ItemReader<OrderRequest> itemReader,
      ItemProcessor<OrderRequest, OrderResponse> itemProcessor,
      ItemWriter<OrderResponse> itemWriter, JobExecutionListener jobListener,
      ItemReadListener<OrderRequest> stepItemReadListener,
      SkipListener<OrderRequest, OrderResponse> stepSkipListener, TaskExecutor taskExecutor) {


    Step step1 = stepBuilderFactory.get("Process-Data")
        .<OrderRequest, OrderResponse>chunk(10)
        .listener(stepItemReadListener)
        .reader(itemReader)
        .processor(itemProcessor)
        .writer(itemWriter)
        .faultTolerant()
        .processorNonTransactional()
        .skipLimit(5)
        .skip(SkipBatchException.class)
        .skip(VuosioteException.class)
        .listener(stepSkipListener)
        .taskExecutor(taskExecutor)
        .throttleLimit(5)
        .build();

    return jobBuilderFactory.get("Batch-Job")
        .incrementer(new RunIdIncrementer())
        .listener(jobListener)
        .start(step1)
        .build();
  }
  
  @StepScope
  @Bean
  public JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader(@Qualifier("postgresDataSource") DataSource dataSource,
      @Value("#{jobParameters[batchId]}") String batchId, OrderRequestMapper rowMapper) {

    // reading database records using JDBC in a paging fashion

    JdbcPagingItemReader<OrderRequest> reader = new JdbcPagingItemReader<>();
    reader.setDataSource(dataSource);
    reader.setFetchSize(1000);
    reader.setRowMapper(rowMapper);

    // Sort Keys
    Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("OrderRequestID", Order.ASCENDING);

    // Postgres implementation of a PagingQueryProvider using database specific features.

    PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
    queryProvider.setSelectClause("*");
    queryProvider.setFromClause("FROM OrderRequest");
    queryProvider.setWhereClause("CUSTOMER = '" + batchId + "'");
    queryProvider.setSortKeys(sortKeys);
    reader.setQueryProvider(queryProvider);
    return reader;
  }
 
 @StepScope
  @Bean
  public SynchronizedItemStreamReader<OrderRequest> itemReader(JdbcPagingItemReader<OrderRequest> jdbcPagingItemReader) {
    return new SynchronizedItemStreamReaderBuilder<OrderRequest>().delegate(jdbcPagingItemReader).build();
  }
  
  @Bean
  public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(5);
    taskExecutor.setMaxPoolSize(5);
    taskExecutor.setQueueCapacity(0);
    return taskExecutor;
  }

  @StepScope
  @Bean
  ItemProcessor<OrderRequest, OrderResponse> dataProcessor() {
    return new BatchDataFileProcessor();
  }

  @StepScope
  @Bean
  ItemWriter<OrderResponse> batchFileWriter() {
    return new BatchFileWriter();
  }


  @StepScope
  @Bean
  public ItemReadListener<OrderRequest> stepItemReadListener() {
    return new StepItemReadListener();
  }


  @Bean
  public JobExecutionListener jobListener() {
    return new JobListener();
  }

  @StepScope
  @Bean
  public SkipListener<OrderRequest, OrderResponse> stepSkipListener() {
    return new StepSkipListener();
  }
```

What is problem with configuration here?
Batch works fine with single record at a time.