I have spring-batch job, where i need to pass as an input to the job a list of id's, I would want that from that list of id's to be able pass to a step that could run all of them in parrallel. As for now what I've accomplish is run multiple job instance in a threadpoolExecutor, that executes the job x number of time. This implies that it does single queries for all jobs. And we are talking about over 50 millions records. The records represents a timeseries @specific day a consumption. I need for an id and batchId aggregate by month and send this information to a broker.
- Reader -> reads from the database according to an id and a timestamps representing a time series.
- Processor -> PassThroughItemProcessor
- Writer -> Send to AMQP (aggregates the the list of items)
Is there any best practice you could provide me ?
According to the suggestions, this is how my partitioner looks like ;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
log.debug("START: Partition");
Map<String, ExecutionContext> partitionMap = new HashMap<>();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger partitionerCounter = new AtomicInteger(0);
Page<Integer> result = null;
do {
result = repository.findDistinctByBatchId(LocalDateTime.parse(batchId, AipForecastService.DEFAULT_DATE_TIME_FORMATTER), Optional.ofNullable(result)
.map(Page::nextPageable)
.orElse(PageRequest.of(0, 100000)));
result
.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / 100))
.values()
.forEach(listOfInstallation -> {
ExecutionContext context = new ExecutionContext();
context.put("listOfInstallation", listOfInstallation);
partitionMap.put("partition" + partitionerCounter.incrementAndGet(), context);
log.debug("Adding to the partition map {}, listOfInstallation {}", partitionerCounter.get(), listOfInstallation);
});
} while (result.hasNext());
log.debug("END: Created Partitions for installation job of size:{}", partitionMap.size());
return partitionMap;
}