I am trying to build a spring batch application where the batch job is built dynamically (not spring managed beans) and launched using JobLauncher. The job is built based on source file and few other information like target store etc... Based on these details I have to build a Job with corresponding reader/writer.
I am able to build and launch synchronous as well as multi threaded job successfully. I am trying scale up the application to handle large files using Partition SPI. But I am not able find a way to pass correct partition to the step.
Because in normal application StepScope annotation is used so spring creates a separate reader for each Step. And late binding (@Value) helps to pass the StepExecution (filePath) information to reader.
Is there any way to achieve my use case without using Step scope?
class CustomJobBuilder {
//JobInfo contains table name, source file etc...
Job build(JobInfo jobInfo) throws Exception {
return jobBuilderFactory
.get(jobInfo.getName())
.start(masterStep())
.build();
}
private Step masterStep() throws Exception {
Step importFileStep = importFileStep();
return stepBuilderFactory
.get("masterStep")
.partitioner(importFileStep.getName(), partitioner())
.step(importFileStep)
.gridSize(6)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
private MultiResourcePartitioner partitioner() throws IOException {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setKeyName(PARTITION_KEY_NAME);
ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();
partitioner.setResources(patternResolver.getResources(jobInfo.getFilePath())); //*.csv
return partitioner;
}
private Step importFileStep() throws Exception {
JdbcBatchItemWriter<Row> successRecordsWriter = dbWriter();
FlatFileItemWriter<Row> failedRecordsWriter = errorWriter();
return stepBuilderFactory
.get("importFile")
.<Row, Row>chunk(CHUNK_SIZE)
.reader(csvReader(null))
.processor(processor())
.writer(writer(successRecordsWriter, failedRecordsWriter))
.stream(failedRecordsWriter)
.build();
}
//Problem here. Passing filePath to CSV Reader dynamically
private ItemReader<Row> csvReader(@Value("#{stepExecutionContext['" + PARTITION_KEY_NAME + "']}") String filePath) {
DefaultLineMapper<Row> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(jobInfo.getColumns());
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new CustomFieldSetMapper(jobInfo.getColumns()));
lineMapper.afterPropertiesSet();
FlatFileItemReader<Row> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new FileSystemResource(filePath));
reader.setLineMapper(lineMapper);
return reader;
}
}
class CustomJobLauncher {
JobParameters jobParameters = new JobParametersBuilder()
.addString("id", UUID.randomUUID().toString())
.toJobParameters();
JobExecution jobExecution;
try {
CustomJobBuilder jobBuilder = new CustomJobBuilder();
jobBuilder.setJobBuilderFactory(jobBuilderFactory);
jobBuilder.setDataSource(getDataSource(objectDto.getDataStore()));
jobBuilder.setStepBuilderFactory(stepBuilderFactory);
jobExecution = jobLauncher.run(jobBuilder.build(jobInfo), jobParameters);
jobExecution.getAllFailureExceptions().forEach(Throwable::printStackTrace);
} catch (Exception e) {
LOGGER.error("Failed", e);
}
}