2
votes

I tried to use Multithreaded for my step like below, but got an exception below:

My step: Code:

<step id="generateRecordFile" >
          <tasklet>
             <chunk reader="inputFileReader" writer="outputFileWriter"
            commit-interval="100" task-executor="asyncTaskExecutor">
                  <streams>
            <stream ref="inputFileReader" />
                  </streams>
             </chunk>
          </tasklet>
        </step>

        <beans:bean id="asyncTaskExecutor"
            class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
Reader:    
        <bean id="inputFileReader"
                class="org.springframework.batch.item.file.MultiResourceItemReader"
                scope="step">
                <property name="resources" value="#{jobParameters['fileLocation']}" />
                <property name="delegate" ref="fileInputFileReader" />
            </bean>
            <bean id="fileProductFeeReader" class="<package>.SynchronizedItemStreamReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
                <property name="lineMapper">
                    <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                        <!-- split it -->
                        <property name="lineTokenizer">
                            <bean
                                class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                                <property name="names"
                                    value="name,age,address,mobileno" />
                                <property name="delimiter">
                                    <util:constant
                                        static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
                                </property>
                            </bean>
                        </property>
                        <property name="fieldSetMapper">
                            <bean
                                class="com.services.extractor.Mapper">
                            </bean>
                        </property>
                    </bean>
                </property>
        </bean>

        <bean id="outputFileWriter"
                class="org.springframework.batch.item.support.CompositeItemWriter"
                scope="step">
                <property name="delegates">
                    <list>
                        <ref bean="routeWriter" />
                    </list>
                </property>
        </bean>

        <bean id="routeWriter"
                class="com.services.extractor.processor.IngestionWriter"
                scope="step">
            </bean> 

public class SynchronizedItemStreamReader implements ResourceAwareItemReaderItemStream {

private FlatFileItemReader<T> delegate;
private Resource resource;


public void setDelegate(FlatFileItemReader<T> delegate) {
    this.delegate = delegate;
}

/**
 * This delegates to the read method of the <code>delegate</code>
 */
public synchronized T read()
        throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    return this.delegate.read();
}

public void close() {
    this.delegate.close();
}

public void open(ExecutionContext executionContext) {
    this.delegate.open(executionContext);
}

public void update(ExecutionContext executionContext) {
    this.delegate.update(executionContext);
}

@Override
public void setResource(Resource resource) {
    this.resource = resource;
}

    writer class:       
        public class IngestionWriter implements ItemWriter<Person> {
            @Override
            public void write(List<? extends Person> items) throws IOException {
                //logic to set into db
                }

Here is the exception:

org.springframework.batch.item.ReaderNotOpenException:Reader must be open before it can be read.:org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
    at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195)
    at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:83)
    at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)

After implementing SynchronizedItemStreamReader exception:

org.springframework.batch.item.ReaderNotOpenException:Reader must be open before it can be read.:org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read. at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195) at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:83) at .SynchronizedItemStreamReader.read(SynchronizedItemStreamReader.java:35) at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119) at org.springframework.batch.item.file.MultiResourceItemReader.read(MultiResourceItemReader.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132) at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) at com.sun.proxy.$Proxy47.read(Unknown Source)

Added reader and writer steps.

Am I missing something here?

Any help is greatly appreciated!!

Thanks!!!

1
Show your reader and writer beans too.Sabir Khan
Added reader and writer beans.Saranya

1 Answers

0
votes

AbstractItemCountingItemStreamItemReader implementations aren't thread-safe. You can try to solve it implementing your own reader and synchronizing methods that mutate shared state like close(), open() and update().