0
votes

I created Reader, Processor and Writer. I defined size of chunk as 5. I have one operation in Processor for each item. I have two transactions in Writer. Update DB for all 5 items and confirm transaction for all 5 items on another place. My items don't depends on each other, so if one of them failed, the other items doesn't care, they want to be proceeded.

Use Case 1:

If it failed in Processor with any kind of exception (RESTful exception, any java exception, DB exception, runtime exception), let say 2nd item, I want to continue with 3rd, 4th and 5th item. If it failed on 4th item I want to continue with 5th item. So, with skip, as I understand, when this chunk with failed item in Processor failed I can repeat this chunk but without 2nd and 4th items (which failed), right? And if Writer goes well, both transactions are commited after chunk and jog start next chunk with next 5 items, right?

Use Case 2:

No matter if chunk is new or repeated Use Case 1 without that 2 items, if in Writer failed second transaction I want to rollback first transaction without manually doing rollback and commit. So, if Write throws exception it will automatically rollback first transaction. And that is good. But what I want is that even there was exception and transaction rolled back (for that chunk), I want to continue with next chunk on same way, with same behaviour, and so on to the last chunk.

To achive Use Case 1 I guess I have to configure step as:

@Configuration
@EnableBatchProcessing
@EnableScheduling
@Slf4j
public class BatchConfiguration {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final MyItemReader myItemReader;
private final MyItemProcessor myItemProcessor;
private final MyItemWriter myItemWriter;

private final SimpleJobExecutionListener simpleJobExecutionListener;
private final MyChunkListener myChunkListener;

private final ApplicationContext applicationContext;
private final DataSource dataSource;





public BatchConfiguration(
        JobBuilderFactory jobBuilderFactory,
        StepBuilderFactory stepBuilderFactory,
        MyItemReader myItemReader,
        MyItemProcessor myItemProcessor,
        MyItemtWriter myItemWriter,
        SimpleJobExecutionListener simpleJobExecutionListener,
        MyChunkListener myChunkTransactionListener,
        DataSource dataSource,
        ApplicationContext applicationContext) {
    this.jobBuilderFactory = jobBuilderFactory;
    this.stepBuilderFactory = stepBuilderFactory;
    this.myItemReader = myItemReader;
    this.myItemProcessor = myItemProcessor;
    this.myItemWriter = myItemWriter;
    this.simpleJobExecutionListener = simpleJobExecutionListener;
    this.myChunkListener = myChunkListener;
    this.dataSource = dataSource;
    this.applicationContext = applicationContext;
}

@Bean
public Job registrationChunkJob() {
    return jobBuilderFactory.get("MyJob")
            .incrementer(new RunIdIncrementer())
            .listener(simpleJobExecutionListener)
            .flow(step()).end().build();
}

@Bean
TaskExecutor taskExecutorStepPush() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(2);
    taskExecutor.setMaxPoolSize(20);
    taskExecutor.setQueueCapacity(4);
    taskExecutor.setAllowCoreThreadTimeOut(true);
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    taskExecutor.setThreadNamePrefix(LoggingUtil.getWeblogicName() + "-");
    return taskExecutor;
}

@Bean
public Step step() {
    DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
    attribute.setPropagationBehavior(Propagation.REQUIRED.value());
    attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());

    return stepBuilderFactory.get("myStep").<MyObject, MyObject>chunk(5)
            .reader(myItemReader)
            .processor(myItemProcessor)
            .faultTolerant()
            .writer(myItemWriter)
            .listener(myChunkListener)
            .taskExecutor(taskExecutorStepPush())
            .throttleLimit(5)
            .transactionAttribute(attribute)
            .build();
}

My job is not scheduled. I start next job manually when current job is finished, successful or not. As I said, I don't change flag in DB from Writer, so if it failed and some data are skipped and not updated in DB (Writer), when job is finished and after 1h it will start new job and try with same (and maybe new) items from DB (Reader will select them because flag will not be updated as processed).

But somehow, this doesn't work and it's late and I can't see why. It takes 5 items in chunk, and it didn't failed in Processor, but it failed in Writer while tried to commit 2 transactions (second one failed). It repeat chunk but only with one item, with first item and tried it 2 times (with one item, first item) and than marked job as Failed and stop. Which I don't want. There is so much items to be selected from DB which could be good one.

I don't want to repeat same chunk if it failed from Writer. I want to repeat chunk only if it failed in Processor (to get good one only). Also, if chunk failed, I don't want job to stop, I want job to continue with next chunk and so on... How to achieve this?

1

1 Answers

0
votes

How to skip any error in chunk and to continue with next items?

To do that, you need to configure which exceptions should cause the item to be skipped, as explained it the Configuring Skip Logic section.

According to your configuration, you did not specify any skippable exception. Your step definition should be something like:

@Bean
public Step step() {
   DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
   attribute.setPropagationBehavior(Propagation.REQUIRED.value());
   attribute.setIsolationLevel(Isolation.READ_COMMITTED.value());

   return stepBuilderFactory.get("myStep").<MyObject, MyObject>chunk(5)
        .reader(myItemReader)
        .processor(myItemProcessor)
        .faultTolerant()
        // add skip configuration
        .skipLimit(10)
        .skip(MySkippableException.class)
        .writer(myItemWriter)
        .listener(myChunkListener)
        .taskExecutor(taskExecutorStepPush())
        .throttleLimit(5)
        .transactionAttribute(attribute)
        .build();
}