1
votes

First of all, this question seems to be nearly exactly what we're wanting to do: Parallel step execution of ItemStreamReader in SpringBatch

Spring batch 3.0.10, running under a Java EE server (WebSphere, Java 8, Java EE 6). I'm also not the original author of this code.

XML configuration, we have a batch step with a read of a line from a file, process the line (the part we want to multi-thread) and a write of a record to a database:

    <batch:step id="processRenewalsStep" next="saveResponseFileStep">
        <batch:tasklet task-executor="taskExecutor" throttle-limit="4">
            <batch:chunk
                reader="batchRenewalCsvFileItemReader"
                writer="asyncBatchRenewalDb2ItemWriter"
                processor="asyncBatchRenewalProcessor"
                commit-interval="1"
                skip-limit="10">
                <batch:skippable-exception-classes>
                    <batch:include class="java.lang.Exception"/>
                </batch:skippable-exception-classes>
                <batch:listeners>
                    <batch:listener ref="batchSkipListener"/>
                </batch:listeners>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>

Even after following the suggested aynchronous wrapping in the accepted answer there:

<bean id="asyncBatchRenewalProcessor" class="org.springframework.batch.integration.async.AsyncItemProcessor"
      p:taskExecutor-ref="taskExecutor"
      p:delegate-ref="batchRenewalProcessor"/>

<bean id="asyncBatchRenewalDb2ItemWriter" class="org.springframework.batch.integration.async.AsyncItemWriter"
      p:delegate-ref="batchRenewalDb2ItemWriter"/>

Our startup log still contains:

22:25:25,252 (Default : 3) WARN org.springframework.batch.core.step.builder.FaultTolerantStepBuilder:Asynchronous TaskExecutor detected with ItemStream reader. This is probably an error, and may lead to incorrect restart data being stored.

And

22:34:03,755 (WorkManager.DefaultWorkManager : 0) WARN org.springframework.batch.core.step.item.ChunkMonitor:No ItemReader set (must be concurrent step), so ignoring offset data.

And several instances of:

22:34:03,880 (WorkManager.DefaultWorkManager : 2) WARN org.springframework.batch.core.step.item.ChunkMonitor:ItemStream was opened in a different thread. Restart data could be compromised.

Which I suppose makes some sense, because we wrapped the Processor and Writer, but there is no class to wrap the Reader?

Running my job, logging does indicate that different threads from my container Executor are being used, but I didn't try this before using the Asynchronous wrappers, so maybe it would have "worked" even without that?

So is there any way to prevent the WARNings, and the condition they're warning about?

Update: Based on this in the documentation:

If a reader is not thread safe, it may still be efficient to use it in your own synchronizing delegator. You can synchronize the call to read() and as long as the processing and writing is the most expensive part of the chunk your step may still complete much faster than in a single threaded configuration.

I've tried creating a FlatFileItemReader subclass that does synchronize its own read() method, that calls the underlying read() method, but I'm still getting the WARN logging. I don't know if I misunderstood that advice or if what I did now is safe but can't be detected as safe, so it's still logging.

1

1 Answers

3
votes

The FlatFileItemReader is not thread safe as it extends AbstractItemCountingItemStreamItemReader which is not thread-safe. So using it in a multi-threaded step is not correct (otherwise execution context data might corrupted by concurrent threads, hence the warnings).

To avoid any concurrency issue, you can try partitioning the file (either physically or logically) and make each thread work on a different partition.