4
votes

We are currently in evaluation process shifting from Spring batch + Batch Admin into Spring Cloud based infrastructure.

our main challenges / questions:

1. As part of the monolithic design of the spring batch jobs we are fetching some general MD and aggregated it into common data structure that many jobs using to run in a more optimized way. is the nature of the SCDF Tasks going to be a problem in our case ? should we reconsider shifting into Streams ? and how its can be done ?

2. One of the major reasons to use SCDF is the support for scaling for better performance. as first POC its going to be hard for us to create a real cloud infrastructure and i was looking for standalone SCDF that use the remote partitioning design for a scaling solution.we looking for a demo/intro GitHub project/guide - i didn't mange to find anything relevant. is it also requiring as past years solution communication between nodes via JMS infrastructure (Spring Integration) ?

3. The main challenge for us is to refactor on of our batch jobs and be able to support both remote partitioning and multiple threads on each node. is it possible to create a spring batch job with both of the aspects.

4. breaking up our monolithic jar with 20 Jobs into separate spring boot über jars isn't simple task to achieve - any thoughts / ideas / best practices.

Best, Elad

1

1 Answers

1
votes

I had the same problem as Elad's point 3 and eventually solved it by using the basic framework as demonstrated here but with modified versions of DeployerPartitionHandler and DeployerStepExecutionHandler.

I first tried the naive approach of creating a two-level partitioning where the step that each worker executes is itself partitioned into sub-partitions. But the framework doesn't seem to support that; it got confused about the step's state.

So I went back to a flat set of partitions but passing multiple step execution ids to each worker. For this to work, I created DeployerMultiPartitionHandler which launches the configured number of workers and passes each one a list of step execution ids. Note that there are now two degrees of freedom: the number of workers and the gridSize, which is the total number of partitions that get distributed as evenly as possible to the workers. Unfortunately, I had to duplicate a lot of DeployerPartitionHandler's code here.

@Slf4j
@Getter
@Setter
public class DeployerMultiPartitionHandler implements PartitionHandler, EnvironmentAware, InitializingBean {

    public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_IDS =
            "spring.cloud.task.step-execution-ids";

    public static final String SPRING_CLOUD_TASK_JOB_EXECUTION_ID =
            "spring.cloud.task.job-execution-id";

    public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_ID =
            "spring.cloud.task.step-execution-id";

    public static final String SPRING_CLOUD_TASK_STEP_NAME =
            "spring.cloud.task.step-name";

    public static final String SPRING_CLOUD_TASK_PARENT_EXECUTION_ID =
            "spring.cloud.task.parentExecutionId";

    public static final String SPRING_CLOUD_TASK_NAME = "spring.cloud.task.name";

    private int maxWorkers = -1;

    private int gridSize = 1;

    private int currentWorkers = 0;

    private TaskLauncher taskLauncher;

    private JobExplorer jobExplorer;

    private TaskExecution taskExecution;

    private Resource resource;

    private String stepName;

    private long pollInterval = 10000;

    private long timeout = -1;

    private Environment environment;

    private Map<String, String> deploymentProperties;

    private EnvironmentVariablesProvider environmentVariablesProvider;

    private String applicationName;

    private CommandLineArgsProvider commandLineArgsProvider;

    private boolean defaultArgsAsEnvironmentVars = false;

    public DeployerMultiPartitionHandler(TaskLauncher taskLauncher,
                                    JobExplorer jobExplorer,
                                    Resource resource,
                                    String stepName) {
            Assert.notNull(taskLauncher, "A taskLauncher is required");
            Assert.notNull(jobExplorer, "A jobExplorer is required");
            Assert.notNull(resource, "A resource is required");
            Assert.hasText(stepName, "A step name is required");

            this.taskLauncher = taskLauncher;
            this.jobExplorer = jobExplorer;
            this.resource = resource;
            this.stepName = stepName;
    }

    @Override
    public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter,
                                            StepExecution stepExecution) throws Exception {


        final Set<StepExecution> tempCandidates =
                stepSplitter.split(stepExecution, this.gridSize);

        // Following two lines due to https://jira.spring.io/browse/BATCH-2490
        final List<StepExecution> candidates = new ArrayList<>(tempCandidates.size());
        candidates.addAll(tempCandidates);

        int partitions = candidates.size();

        log.debug(String.format("%s partitions were returned", partitions));

        final Set<StepExecution> executed = new HashSet<>(candidates.size());

        if (CollectionUtils.isEmpty(candidates)) {
            return null;
        }

        launchWorkers(candidates, executed);

        candidates.removeAll(executed);

        return pollReplies(stepExecution, executed, partitions);
    }

    private void launchWorkers(List<StepExecution> candidates, Set<StepExecution> executed) {
        int partitions = candidates.size();
        int numWorkers = this.maxWorkers != -1 ? Math.min(this.maxWorkers, partitions) : partitions;
        IntStream.range(0, numWorkers).boxed()
                .map(i -> candidates.subList(partitionOffset(partitions, numWorkers, i), partitionOffset(partitions, numWorkers, i + 1)))
                .filter(not(List::isEmpty))
                .forEach(stepExecutions -> processStepExecutions(stepExecutions, executed));
    }

    private void processStepExecutions(List<StepExecution> stepExecutions, Set<StepExecution> executed) {
        launchWorker(stepExecutions);
        this.currentWorkers++;
        executed.addAll(stepExecutions);
    }

    private void launchWorker(List<StepExecution> workerStepExecutions) {
        List<String> arguments = new ArrayList<>();

        StepExecution firstWorkerStepExecution = workerStepExecutions.get(0);
        ExecutionContext copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());

        arguments.addAll(
                this.commandLineArgsProvider
                        .getCommandLineArgs(copyContext));

        String jobExecutionId = String.valueOf(firstWorkerStepExecution.getJobExecution().getId());
        String stepExecutionIds = workerStepExecutions.stream().map(workerStepExecution -> String.valueOf(workerStepExecution.getId())).collect(joining(","));
        String taskName = String.format("%s_%s_%s",
                taskExecution.getTaskName(),
                firstWorkerStepExecution.getJobExecution().getJobInstance().getJobName(),
                firstWorkerStepExecution.getStepName());
        String parentExecutionId = String.valueOf(taskExecution.getExecutionId());

        if(!this.defaultArgsAsEnvironmentVars) {
            arguments.add(formatArgument(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
                    jobExecutionId));
            arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS,
                    stepExecutionIds));
            arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_NAME, this.stepName));
            arguments.add(formatArgument(SPRING_CLOUD_TASK_NAME, taskName));
            arguments.add(formatArgument(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
                    parentExecutionId));
        }

        copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());

        log.info("launchWorker context={}", copyContext);

        Map<String, String> environmentVariables = this.environmentVariablesProvider.getEnvironmentVariables(copyContext);

        if(this.defaultArgsAsEnvironmentVars) {
            environmentVariables.put(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
                    jobExecutionId);
            environmentVariables.put(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
                    String.valueOf(firstWorkerStepExecution.getId()));
            environmentVariables.put(SPRING_CLOUD_TASK_STEP_NAME, this.stepName);
            environmentVariables.put(SPRING_CLOUD_TASK_NAME, taskName);
            environmentVariables.put(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
                    parentExecutionId);
        }

        AppDefinition definition =
                new AppDefinition(resolveApplicationName(),
                        environmentVariables);

        AppDeploymentRequest request =
                new AppDeploymentRequest(definition,
                        this.resource,
                        this.deploymentProperties,
                        arguments);

        taskLauncher.launch(request);
    }

    private String resolveApplicationName() {
        if(StringUtils.hasText(this.applicationName)) {
            return this.applicationName;
        }
        else {
            return this.taskExecution.getTaskName();
        }
    }

    private String formatArgument(String key, String value) {
        return String.format("--%s=%s", key, value);
    }

    private Collection<StepExecution> pollReplies(final StepExecution masterStepExecution,
                                                  final Set<StepExecution> executed,
                                                  final int size) throws Exception {

        final Collection<StepExecution> result = new ArrayList<>(executed.size());

        Callable<Collection<StepExecution>> callback = new Callable<Collection<StepExecution>>() {
            @Override
            public Collection<StepExecution> call() {
                Set<StepExecution> newExecuted = new HashSet<>();

                for (StepExecution curStepExecution : executed) {
                    if (!result.contains(curStepExecution)) {
                        StepExecution partitionStepExecution =
                                jobExplorer.getStepExecution(masterStepExecution.getJobExecutionId(), curStepExecution.getId());

                        if (isComplete(partitionStepExecution.getStatus())) {
                            result.add(partitionStepExecution);
                            currentWorkers--;
                        }
                    }
                }

                executed.addAll(newExecuted);

                if (result.size() == size) {
                    return result;
                }
                else {
                    return null;
                }
            }
        };

        Poller<Collection<StepExecution>> poller = new DirectPoller<>(this.pollInterval);
        Future<Collection<StepExecution>> resultsFuture = poller.poll(callback);

        if (timeout >= 0) {
            return resultsFuture.get(timeout, TimeUnit.MILLISECONDS);
        }
        else {
            return resultsFuture.get();
        }
    }

    private boolean isComplete(BatchStatus status) {
        return status.equals(BatchStatus.COMPLETED) || status.isGreaterThan(BatchStatus.STARTED);
    }

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    @Override
    public void afterPropertiesSet() {
        Assert.notNull(taskExecution, "A taskExecution is required");

        if(this.environmentVariablesProvider == null) {
            this.environmentVariablesProvider =
                    new CloudEnvironmentVariablesProvider(this.environment);
        }

        if(this.commandLineArgsProvider == null) {
            SimpleCommandLineArgsProvider simpleCommandLineArgsProvider = new SimpleCommandLineArgsProvider();
            simpleCommandLineArgsProvider.onTaskStartup(taskExecution);
            this.commandLineArgsProvider = simpleCommandLineArgsProvider;
        }
    }

}

The partitions are distributed to workers with the help of static function partitionOffset, which ensures that the number of partitions each worker receives differ by at most one:

static int partitionOffset(int length, int numberOfPartitions, int partitionIndex) {
    return partitionIndex * (length / numberOfPartitions) + Math.min(partitionIndex, length % numberOfPartitions);
}

On the receiving end I created DeployerMultiStepExecutionHandler which inherits the parallel execution of partitions from TaskExecutorPartitionHandler and in addition implements the command line interface matching DeployerMultiPartitionHandler:

@Slf4j
public class DeployerMultiStepExecutionHandler extends TaskExecutorPartitionHandler implements CommandLineRunner {

    private JobExplorer jobExplorer;

    private JobRepository jobRepository;

    private Log logger = LogFactory.getLog(org.springframework.cloud.task.batch.partition.DeployerStepExecutionHandler.class);

    @Autowired
    private Environment environment;

    private StepLocator stepLocator;

    public DeployerMultiStepExecutionHandler(BeanFactory beanFactory, JobExplorer jobExplorer, JobRepository jobRepository) {
        Assert.notNull(beanFactory, "A beanFactory is required");
        Assert.notNull(jobExplorer, "A jobExplorer is required");
        Assert.notNull(jobRepository, "A jobRepository is required");

        this.stepLocator = new BeanFactoryStepLocator();
        ((BeanFactoryStepLocator) this.stepLocator).setBeanFactory(beanFactory);

        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
    }

    @Override
    public void run(String... args) throws Exception {

        validateRequest();

        Long jobExecutionId = Long.parseLong(environment.getProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID));
        Stream<Long> stepExecutionIds = Stream.of(environment.getProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS).split(",")).map(Long::parseLong);
        Set<StepExecution> stepExecutions = stepExecutionIds.map(stepExecutionId -> jobExplorer.getStepExecution(jobExecutionId, stepExecutionId)).collect(Collectors.toSet());

        log.info("found stepExecutions:\n{}", stepExecutions.stream().map(stepExecution -> stepExecution.getId() + ":" + stepExecution.getExecutionContext()).collect(joining("\n")));

        if (stepExecutions.isEmpty()) {
            throw new NoSuchStepException(String.format("No StepExecution could be located for step execution id %s within job execution %s", stepExecutionIds, jobExecutionId));
        }

        String stepName = environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME);
        setStep(stepLocator.getStep(stepName));

        doHandle(null, stepExecutions);
    }

    private void validateRequest() {
        Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID), "A job execution id is required");
        Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS), "A step execution id is required");
        Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_NAME), "A step name is required");

        Assert.isTrue(this.stepLocator.getStepNames().contains(environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME)), "The step requested cannot be found in the provided BeanFactory");
    }
}