I tried to use Multithreaded for my step like below, but got an exception below:
My step: Code:
<step id="generateRecordFile" >
<tasklet>
<chunk reader="inputFileReader" writer="outputFileWriter"
commit-interval="100" task-executor="asyncTaskExecutor">
<streams>
<stream ref="inputFileReader" />
</streams>
</chunk>
</tasklet>
</step>
<beans:bean id="asyncTaskExecutor"
class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
Reader:
<bean id="inputFileReader"
class="org.springframework.batch.item.file.MultiResourceItemReader"
scope="step">
<property name="resources" value="#{jobParameters['fileLocation']}" />
<property name="delegate" ref="fileInputFileReader" />
</bean>
<bean id="fileProductFeeReader" class="<package>.SynchronizedItemStreamReader">
<property name="delegate">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<!-- split it -->
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names"
value="name,age,address,mobileno" />
<property name="delimiter">
<util:constant
static-field="org.springframework.batch.item.file.transform.DelimitedLineTokenizer.DELIMITER_TAB" />
</property>
</bean>
</property>
<property name="fieldSetMapper">
<bean
class="com.services.extractor.Mapper">
</bean>
</property>
</bean>
</property>
</bean>
<bean id="outputFileWriter"
class="org.springframework.batch.item.support.CompositeItemWriter"
scope="step">
<property name="delegates">
<list>
<ref bean="routeWriter" />
</list>
</property>
</bean>
<bean id="routeWriter"
class="com.services.extractor.processor.IngestionWriter"
scope="step">
</bean>
public class SynchronizedItemStreamReader implements ResourceAwareItemReaderItemStream {
private FlatFileItemReader<T> delegate;
private Resource resource;
public void setDelegate(FlatFileItemReader<T> delegate) {
this.delegate = delegate;
}
/**
* This delegates to the read method of the <code>delegate</code>
*/
public synchronized T read()
throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return this.delegate.read();
}
public void close() {
this.delegate.close();
}
public void open(ExecutionContext executionContext) {
this.delegate.open(executionContext);
}
public void update(ExecutionContext executionContext) {
this.delegate.update(executionContext);
}
@Override
public void setResource(Resource resource) {
this.resource = resource;
}
writer class:
public class IngestionWriter implements ItemWriter<Person> {
@Override
public void write(List<? extends Person> items) throws IOException {
//logic to set into db
}
Here is the exception:
org.springframework.batch.item.ReaderNotOpenException:Reader must be open before it can be read.:org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read.
at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195)
at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:83)
at sun.reflect.GeneratedMethodAccessor73.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
After implementing SynchronizedItemStreamReader exception:
org.springframework.batch.item.ReaderNotOpenException:Reader must be open before it can be read.:org.springframework.batch.item.ReaderNotOpenException: Reader must be open before it can be read. at org.springframework.batch.item.file.FlatFileItemReader.readLine(FlatFileItemReader.java:195) at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:173) at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:83) at .SynchronizedItemStreamReader.read(SynchronizedItemStreamReader.java:35) at org.springframework.batch.item.file.MultiResourceItemReader.readFromDelegate(MultiResourceItemReader.java:140) at org.springframework.batch.item.file.MultiResourceItemReader.readNextItem(MultiResourceItemReader.java:119) at org.springframework.batch.item.file.MultiResourceItemReader.read(MultiResourceItemReader.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132) at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) at com.sun.proxy.$Proxy47.read(Unknown Source)
Added reader and writer steps.
Am I missing something here?
Any help is greatly appreciated!!
Thanks!!!