0
votes

My this question is an extension to my another SO Question. Since that doesn't look possible, I am trying to execute chunks in parallel for parallel / partitioned slave steps.

Article says that by just specifying SimpleAsyncTaskExecutor as task executor for a step would start executing chunks in parallel.

@Bean
public Step masterLuceneIndexerStep() throws Exception{
        return stepBuilderFactory.get("masterLuceneIndexerStep")
                .partitioner(slaveLuceneIndexerStep())
                .partitioner("slaveLuceneIndexerStep", partitioner())
                .gridSize(Constants.PARTITIONER_GRID_SIZE)
                .taskExecutor(simpleAsyntaskExecutor)
                .build();
    }

    @Bean
    public Step slaveLuceneIndexerStep()throws Exception{
        return stepBuilderFactory.get("slaveLuceneIndexerStep")
                .<IndexerInputVO,IndexerOutputVO> chunk(Constants.INDEXER_STEP_CHUNK_SIZE)
                .reader(luceneIndexReader(null))
                .processor(luceneIndexProcessor())
                .writer(luceneIndexWriter(null))
                .listener(luceneIndexerStepListener)
                .listener(lichunkListener)
                .throttleLimit(Constants.THROTTLE_LIMIT)
                .build();
    }

If I specify, .taskExecutor(simpleAsyntaskExecutor) for slave step then job fails. Line .taskExecutor(simpleAsyntaskExecutor) in master step works OK but chunks work in serial and partitioned steps in parallel.

Is it possible to parallelize chunks of slaveLuceneIndexerStep()?

Basically, each chunk is writing Lucene indices to a single directory in sequential fashion and I want to further parallelize index writing process within each directory since Lucene IndexWriter is thread-safe.

1
What do you mean by the job failed? It should be possible to parallelize chunks. But I guess, you have to make the step "faulttolerant", so the calls of the builder should be something like "chunk().reader().writer().faultTolerant().taskExecutor().throttleLimit().build()". However, I have never used this in slave-steps of a partitioner.Hansjoerg Wingeier
What does faulttolerant do? My job remains stuck for quite some time then fails due to different errors all the time. I am guessing that somewhere there is a thread-safety issue in my slave step's chunk components ( reader, processor & writer ). Am I correct to assume that reader, processor & writer should strictly be thread-safe to parallelize chunks?Sabir Khan
FaultTolerant is used to configure the skip and retry policies. I observed, that executing a step in parallel only worked in my environment, when I used faulttolerant. However, the reason for that could be, that I use special methods to configure some basic listeners. So this may be due do this and has probably nothing to do with your problem. I have to admit, I haven't really investigated it deeper.Hansjoerg Wingeier
Your second point: you are right - when parallelizing chunks, you have to ensure that your reader, processor and writer are threadsafe. Some of this components are by nature threadsafe (for instance the FlatFileItemReader which uses the BufferedReader readLine method which is threadsafe), for others, you have to use a wrapper around it (SynchronizedItemReader). You also have to be aware, that in this case restarting is not working, hence you should configure the readers and writers with "saveState=false"Hansjoerg Wingeier

1 Answers

0
votes

I am able to launch parallel chunks from within a partitioned slave step by following,

1.I first took care of my reader, processor and writer to be thread safe so that those components can participate in parallel chunks without concurrency issues.

2.I kept task executor as for master step as SimpleAsyntaskExecutor since slave steps are long running and I wish to start exactly N-threads at a point in time. I control N by setting concurrencyLimit of task executor.

3.Then I set a ThreadPoolTaskExecutor as task executor for slave step. This pool gets used by all slave steps as a common pool so I set its core pool size as a minimum of N so that each slave step gets at least one thread and starvation doesn't happen. You can increase this thread pool size as per system capacity and I used a thread pool since chunks are smaller running processes.

Using a thread pool also handles a specific case for my application that my partitioning is by client_id so when smaller clients are done same threads get automatically reused by bigger clients and asymmetry created by client_id partitioning gets handled since data to be processed for each client varies a lot.

Master step task executor simply starts all slave step threads and goes to WAITINGstate while slave step chunks get processed by thread pool specified in slave step.