0
votes

TL;DR

I used this example to build a simple application that uses Spring Batch (remote partitioning) and Spring Cloud data flow to deploy worker pods on Kubernetes.

Looking at the logs for the "partitionedJob" pod created on Kubernetes, I see that the worker steps (pods) are getting launched sequentially. The time taken to launch one worker pod is roughly 10-15 seconds (Sometimes this is as high as 2 minutes as well as shown below). As a result, worker pods are getting launched at a gap of 10-15 seconds one by one.


Logs :

[info 2021/06/26 14:30:29.089 UTC <main> tid=0x1] Job: [SimpleJob: [name=job]] launched with the following parameters: [{maxWorkers=40, chunkSize=5000, run.id=13, batch.worker-app=docker://docker-myhost.artifactrepository.net/my-project/myjob:0.1, grideSize=40}]

[info 2021/06/26 14:30:29.155 UTC <main> tid=0x1] The job execution id 26 was run within the task execution 235

[info 2021/06/26 14:30:29.184 UTC <main> tid=0x1] Executing step: [masterStep]

2021-06-26 14:30:29 INFO  AuditRecordPartitioner:51 - Creating partitions. [gridSize=40]

[info 2021/06/26 14:32:41.128 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker entry point style: exec

It takes roughly 7-8 minutes for 40 pods to be created on Kubernetes. (Sometimes this number is as high as 20 minutes) What would be ideal is for all the partitioned steps (worker pods) to be launched asynchronously in one go.

Question : How can we configure Spring Cloud Data Flow /Spring Batch to launch worker pods (partitioned steps) asynchronously/parallelly instead of sequentially? If SCDF is indeed creating 40 partitions in one go, why is that in reality, the master job is creating these partitions one by one at a very slow rate? (As seen in the logs). I don't believe it is an infra issue because I am able to launch tasks at a rapid speed using the Task DSL

Relevant code:

@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}


/**
 * 
 * Main job controller
 * 
 * 
 */
@Profile("master")
@Configuration
public class MasterConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(MasterConfiguration.class);

    @Autowired
    private ApplicationArguments applicationArguments;

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory) {
        LOGGER.info("Creating job...");
        SimpleJobBuilder jobBuilder = jobBuilderFactory.get("job").start(masterStep(null, null, null));

        jobBuilder.incrementer(new RunIdIncrementer());

        return jobBuilder.build();
    }

    @Bean
    public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner,
            PartitionHandler partitionHandler) {
        LOGGER.info("Creating masterStep");
        return stepBuilderFactory.get("masterStep").partitioner("workerStep", partitioner)
                .partitionHandler(partitionHandler).build();
    }

    @Bean
    public DeployerPartitionHandler partitionHandler(@Value("${spring.profiles.active}") String activeProfile,
            @Value("${batch.worker-app}") String resourceLocation,
            @Value("${spring.application.name}") String applicationName, ApplicationContext context,
            TaskLauncher taskLauncher, JobExplorer jobExplorer, ResourceLoaderResolver resolver) {
        ResourceLoader resourceLoader = resolver.get(resourceLocation);
        Resource resource = resourceLoader.getResource(resourceLocation);
        DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                "workerStep");

        List<String> commandLineArgs = new ArrayList<>();
        commandLineArgs.add("--spring.profiles.active=" + activeProfile.replace("master", "worker"));
        commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
        commandLineArgs.add("--spring.batch.initializer.enabled=false");

        commandLineArgs.addAll(Arrays.stream(applicationArguments.getSourceArgs()).filter(
                x -> !x.startsWith("--spring.profiles.active=") && !x.startsWith("--spring.cloud.task.executionid="))
                .collect(Collectors.toList()));
        commandLineArgs.addAll(applicationArguments.getNonOptionArgs());

        partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
        partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());

        List<String> nonOptionArgs = applicationArguments.getNonOptionArgs();

        partitionHandler.setMaxWorkers(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 3)));
        partitionHandler.setGridSize(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 4)));
        partitionHandler.setApplicationName(applicationName);

        return partitionHandler;
    }

    @Bean("auditRecordPartitioner")
    public Partitioner auditRecordPartitioner() {
        
        return new AuditRecordPartitioner<>());
    }
    
    private String getNonOptionArgValue(List<String> nonOptionArgs, int index)  {
        return nonOptionArgs.get(index).split("=")[1];
    }
}


@Profile("worker")
@Configuration
public class WorkerConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerConfiguration.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private ApplicationArguments applicationArguments;

    @Bean
    public DeployerStepExecutionHandler stepExecutionHandler(ApplicationContext context, JobExplorer jobExplorer,
            JobRepository jobRepository) {
        LOGGER.info("stepExecutionHandler...");
        return new DeployerStepExecutionHandler(context, jobExplorer, jobRepository);
    }

    @Bean
    public Step workerStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("workerStep").tasklet(workerTasklet(null)).build();
    }

    @Bean
    @StepScope
    public WorkerTasklet workerTasklet(@Value("#{stepExecutionContext['key']}") String key) {
        return new WorkerTasklet(key);
    }

    
}

Note that I am passing gridSize and maxWorkers as input argumnets to the master step (From SCDF UI while launching the task).

2

2 Answers

0
votes

The sample, for demonstration purposes, sets the maximum number of workers to 2 here. So for your 40 partitions, only two workers will be launched in parallel and which makes you think you partitions are being processed in sequence.

You need to update the sample (or make it configurable) and increase the number of concurrent workers as needed.

0
votes

As mentioned by Mahmoud Ben Hassine in the comments, the workers are launched sequentially :

private void launchWorkers(Set<StepExecution> candidates,
            Set<StepExecution> executed) {
        for (StepExecution execution : candidates) {
            if (this.currentWorkers < this.maxWorkers || this.maxWorkers < 0) {
                launchWorker(execution);
                this.currentWorkers++;

                executed.add(execution);
            }
        }
    }

As Glen Renfro mentioned in the comments, an issue has been created for the same. This answer will be updated if a solution is available for launching workers asynchronously.