I am using spring batch to read records from postgresql DB using RepositoryItemReader and then write it to a topic. I see that there were around 1 million records which had to be processed but it didn't process all the records. I have set pageSize for reader as 10,000 and same as commit interval (chunk size)
@Bean
public TaskletStep broadcastProductsStep(){
return stepBuilderFactory.get("broadcastProducts")
.<Product, Product> chunk(10000)
.reader(productsReader.repositoryItemReader())
.processor(productsProcessor)
.writer(compositeItemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(100000)
.processorNonTransactional()
.listener(new SkipListenerProducts())
.listener(productsChunkListener)
.build();
}
@Bean
public RepositoryItemReader repositoryItemReader() {
RepositoryItemReader<Product> repositoryReader = new RepositoryItemReader<>();
try {
repositoryReader.setRepository(skuRepository);
repositoryReader.setMethodName("findByIsUpdatedAndStatusCodeIn");
repositoryReader.setPageSize(10000);
repositoryReader.setSaveState(false);
List<List<String>> arguments = new ArrayList<>();
arguments.add(Stream.of(SkuStatus.RELEASED.getValue().toString(), SkuStatus.BLOCKED.getValue().toString(),
SkuStatus.DISCONTINUED.getValue().toString())
.collect(Collectors.toList()));
repositoryReader.setArguments(arguments);
Map sorts = new HashMap();
sorts.put("catalog_number", Sort.Direction.ASC);
repositoryReader.setSort(sorts);
repositoryReader.afterPropertiesSet();
} catch (Exception exception){
exception.printStackTrace();
}
return repositoryReader;
}
@Query(value = "SELECT * FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
countQuery = "SELECT COUNT(*) FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
nativeQuery = true)
public Page<Product> findByIsUpdatedAndStatusCodeIn(@Param(value = "statusCode") List<String> statusCode,
Pageable pageable);