0
votes

I am using spring batch to read records from postgresql DB using RepositoryItemReader and then write it to a topic. I see that there were around 1 million records which had to be processed but it didn't process all the records. I have set pageSize for reader as 10,000 and same as commit interval (chunk size)

@Bean
public TaskletStep broadcastProductsStep(){
    return stepBuilderFactory.get("broadcastProducts")
            .<Product, Product> chunk(10000)
            .reader(productsReader.repositoryItemReader())
            .processor(productsProcessor)
            .writer(compositeItemWriter)                    
            .faultTolerant()
            .skip(Exception.class)                              
            .skipLimit(100000)
            .processorNonTransactional()                        
            .listener(new SkipListenerProducts())               
            .listener(productsChunkListener)
            .build();
}


@Bean
public RepositoryItemReader repositoryItemReader() {

    RepositoryItemReader<Product> repositoryReader = new RepositoryItemReader<>();

    try {
        repositoryReader.setRepository(skuRepository);
        repositoryReader.setMethodName("findByIsUpdatedAndStatusCodeIn");
        repositoryReader.setPageSize(10000);
        repositoryReader.setSaveState(false);

        List<List<String>> arguments = new ArrayList<>();
        arguments.add(Stream.of(SkuStatus.RELEASED.getValue().toString(), SkuStatus.BLOCKED.getValue().toString(),
                SkuStatus.DISCONTINUED.getValue().toString())
                  .collect(Collectors.toList()));
        repositoryReader.setArguments(arguments);

        Map sorts = new HashMap();
        sorts.put("catalog_number", Sort.Direction.ASC);

        repositoryReader.setSort(sorts);
        repositoryReader.afterPropertiesSet();

    } catch (Exception exception){
        exception.printStackTrace();
    }

    return repositoryReader;
}

@Query(value = "SELECT * FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
       countQuery = "SELECT COUNT(*) FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
       nativeQuery = true)
public Page<Product> findByIsUpdatedAndStatusCodeIn(@Param(value = "statusCode") List<String> statusCode, 
        Pageable pageable);
1
do you change the IS_UPDATED column after writing to the topic ?benbenw
Yes...do modify it in the writer later as false.Goni_code_love

1 Answers

2
votes

The problem is probably that you're mixing pagination and update on the criteria of the reader query (IS_UPDATED).

Example with page size = 2 and 6 lines in db

  • A IS_UPDATED=true
  • B IS_UPDATED=true
  • C IS_UPDATED=true
  • D IS_UPDATED=true
  • E IS_UPDATED=true
  • F IS_UPDATED=true

First read page = 1 return lines A and B

After writer execution (set IS_UPDATED to false for A & B), we have in db :

  • C IS_UPDATED=true
  • D IS_UPDATED=true
  • E IS_UPDATED=true
  • F IS_UPDATED=true

Second read will move to page 2 so it will take line E & F and not C & D

Either :

  1. you should not update the IS_UPDATED column.
  2. Or you create a subclass of RepositoryItemReader and where you override getPage
    @Override
    public int getPage() {
        return 0;
    }

Option 2 is more resilient to batch crash / error but you have to make sure IS_UPDATED is always set to false in your writer otherwise the reader will indefinitely loop.

Option 2 will also need more tuning if you're using multithreaded step.