0
votes

I am trying to build a spring batch application where the batch job is built dynamically (not spring managed beans) and launched using JobLauncher. The job is built based on source file and few other information like target store etc... Based on these details I have to build a Job with corresponding reader/writer.

I am able to build and launch synchronous as well as multi threaded job successfully. I am trying scale up the application to handle large files using Partition SPI. But I am not able find a way to pass correct partition to the step.

Because in normal application StepScope annotation is used so spring creates a separate reader for each Step. And late binding (@Value) helps to pass the StepExecution (filePath) information to reader.

Is there any way to achieve my use case without using Step scope?

​class CustomJobBuilder {
    ​//JobInfo contains table name, source file etc...

    ​Job build(JobInfo jobInfo) throws Exception {
      return jobBuilderFactory
          .get(jobInfo.getName())
          .start(masterStep())
          .build();
    }


  private Step masterStep() throws Exception {
    Step importFileStep = importFileStep();
    return stepBuilderFactory
        .get("masterStep")
        .partitioner(importFileStep.getName(), partitioner())
        .step(importFileStep)
        .gridSize(6)
        .taskExecutor(new SimpleAsyncTaskExecutor())
        .build();
  }

  private MultiResourcePartitioner partitioner() throws IOException {
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    partitioner.setKeyName(PARTITION_KEY_NAME);
    ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();
    partitioner.setResources(patternResolver.getResources(jobInfo.getFilePath())); //*.csv
    return partitioner;
  }

  private Step importFileStep() throws Exception {
    JdbcBatchItemWriter<Row> successRecordsWriter = dbWriter();
    FlatFileItemWriter<Row> failedRecordsWriter = errorWriter();
    return stepBuilderFactory
        .get("importFile")
        .<Row, Row>chunk(CHUNK_SIZE)
        .reader(csvReader(null))
        .processor(processor())
        .writer(writer(successRecordsWriter, failedRecordsWriter))
        .stream(failedRecordsWriter)
        .build();
  }

  //Problem here. Passing filePath to CSV Reader dynamically
  private ItemReader<Row> csvReader(@Value("#{stepExecutionContext['" + PARTITION_KEY_NAME + "']}") String filePath) {
    DefaultLineMapper<Row> lineMapper = new DefaultLineMapper<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames(jobInfo.getColumns());
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new CustomFieldSetMapper(jobInfo.getColumns()));
    lineMapper.afterPropertiesSet();

    FlatFileItemReader<Row> reader = new FlatFileItemReader<>();
    reader.setLinesToSkip(1);
    reader.setResource(new FileSystemResource(filePath));
    reader.setLineMapper(lineMapper);
    return reader;
  }
​}

​class CustomJobLauncher {

    JobParameters jobParameters = new JobParametersBuilder()
        .addString("id", UUID.randomUUID().toString())
        .toJobParameters();
    JobExecution jobExecution;
    try {
      CustomJobBuilder jobBuilder = new CustomJobBuilder();
      jobBuilder.setJobBuilderFactory(jobBuilderFactory);
      jobBuilder.setDataSource(getDataSource(objectDto.getDataStore()));
      jobBuilder.setStepBuilderFactory(stepBuilderFactory);

      jobExecution = jobLauncher.run(jobBuilder.build(jobInfo), jobParameters);
      jobExecution.getAllFailureExceptions().forEach(Throwable::printStackTrace);
    } catch (Exception e) {
      LOGGER.error("Failed", e);
    }
}
1

1 Answers

0
votes

I have solved the problem by mimicing MessageChannelRemotePartitionHandler and StepExecutionRequestHandler.

Instead of relying on BeanFactoryStepLocator to get the step from the beanFactory, I have re-constructed the step on the slave and executed it.

You have to have to cautious about constructing new Step because it has to be exactly same on all slaves other it would lead to processing/writing inconsistencies.

// PartitionHandler - partition method
public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter,
                                          final StepExecution masterStepExecution) throws Exception {

    final Set<StepExecution> split = stepExecutionSplitter.split(masterStepExecution, gridSize);

    if(CollectionUtils.isEmpty(split)) {
      return null;
    }

    int count = 0;

    for (StepExecution stepExecution : split) {
      Message<PartitionExecutionRequest> request = createMessage(count++, split.size(),
          new PartitionExecutionRequest(stepExecution.getJobExecutionId(), stepExecution.getId(), RequestContextProvider.getRequestInfo(), jobInfo, object),
          replyChannel);
      if (logger.isDebugEnabled()) {
        logger.debug("Sending request: " + request);
      }
      messagingGateway.send(request);
    }

    if(!pollRepositoryForResults) {
      return receiveReplies(replyChannel);
    }
    else {
      return pollReplies(masterStepExecution, split);
    }
  }

//On the slave
@MessageEndpoint
public class PartitionExecutionRequestHandler {

  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionExecutionRequestHandler.class);
  private BatchBeanProvider batchBeanProvider;

  public void setBatchBeanProvider(BatchBeanProvider batchBeanProvider) {
    this.batchBeanProvider = batchBeanProvider;
  }


  @ServiceActivator
  public StepExecution handle(PartitionExecutionRequest request) {
    StepExecution stepExecution = null;
    try {
      before(request);
      Long jobExecutionId = request.getJobExecutionId();
      Long stepExecutionId = request.getStepExecutionId();
      stepExecution = batchBeanProvider.getJobExplorer().getStepExecution(jobExecutionId, stepExecutionId);
      if (stepExecution == null) {
        throw new NoSuchStepException("No StepExecution could be located for this request: " + request);
      }
      try {
        CustomJobCreator jobCreator = new CustomJobCreator(batchBeanProvider, request.getJobInfo(), request.getObject());
        jobCreator.afterPropertiesSet();
        ResourcePatternResolver patternResolver = new PathMatchingResourcePatternResolver();
        Resource resource = patternResolver.getResource(stepExecution.getExecutionContext().getString(CustomJobCreator.PARTITION_KEY_NAME));
        Step step = jobCreator.partitionStep(resource.getFile().getAbsolutePath());
        step.execute(stepExecution);
      } catch (JobInterruptedException e) {
        stepExecution.setStatus(BatchStatus.STOPPED);
        // The receiver should update the stepExecution in repository
      } catch (Throwable e) {
        stepExecution.addFailureException(e);
        stepExecution.setStatus(BatchStatus.FAILED);
        // The receiver should update the stepExecution in repository
      }
    }
    return stepExecution;
  }
}