I have to implement below use case in a Spring Batch job:
- Read a list of providers via
StoredProcedureItemReader
- Iterate through the list and call another
StoredProcedureItemReader
for each provider (as input parameter) found in step 1. - The output of second SP is to be written to a CSV.
I have come up with below strategy:
- Step 1 starts
- SP ItemReader returns a list of providers.
- In the ItemWriter, save the providers to
ExecutionContext
- Step 1 ends
- Step 2 starts
- Another SP ItemReader accesses the providers from
ExecutionContext
- Another ItemWriter writes the response to a CSV using FlatFileItemWriter
I am having trouble understanding how the second SP ItemReader will access the list of providers returned by the first SP ItemReader. Will a partition help here? Also, is there a better strategy to accomplish this?
EDIT 1:
Here is the original implementation of ItemProcessor (as enrichment):
@Scope("step")
public class FetchReportFromProviderProcessor implements
ItemProcessor<RiscProvider, List<SogReportRecord>>, ItemStream {
StoredProcedureItemReader<SogReportRecord> reader = new StoredProcedureItemReader<SogReportRecord>();
private DataSource dataSource;
@Value("#{jobParameters['date']}")
private String date;
@Override
public List<SogReportRecord> process(final RiscProvider item) throws Exception {
SogReportRecord record = null;
List<SogReportRecord> records = new ArrayList<SogReportRecord>();
SqlParameter[] sqlParameters = new SqlParameter[] {new SqlParameter(OracleTypes.CURSOR)};
reader.setParameters(sqlParameters);
reader.setPreparedStatementSetter(new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
ps.setString(0, item.getPrefix());
ps.setString(1, date);
}
});
while( (record = reader.read()) != null ) {
records.add(record);
}
return records;
}
public DataSource getDataSource() {
return dataSource;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public void open(ExecutionContext executionContext)
throws ItemStreamException {
reader.setDataSource(dataSource);
reader.setProcedureName("RISC_GET_DAYMOVEINOUT");
reader.open(executionContext);
}
@Override
public void update(ExecutionContext executionContext)
throws ItemStreamException {
reader.update(executionContext);
}
@Override
public void close() throws ItemStreamException {
reader.close();
}
}
And the XML part:
<batch:job id="SOG_MOVEINOUT_REPORT_GENERATOR">
<batch:step id="GET_REPORTS">
<batch:tasklet>
<batch:chunk reader="getProviders"
processor="fetchRecordsFromProvider"
writer="sogReportWriter" commit-interval="500" />
</batch:tasklet>
</batch:step>
</batch:job>
<!-- Reader to fetch list of providers -->
<bean id="getProviders" class="org.springframework.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource" />
<property name="procedureName" value="RISC_GET_PROVIDER" />
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="providers" />
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR" />
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1" />
<property name="rowMapper">
<bean class="com.kpn.risc.ProviderRowMapper" />
</property>
</bean>
<bean id="fetchRecordsFromProvider" class="com.kpn.risc.FetchReportFromProviderProcessor">
<property name="dataSource" ref="dataSource" />
</bean>
<bean id="sogReportWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="file:///${batch.job.report.dir}/report-#{stepExecutionContext['provider']}.csv" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.PassThroughFieldExtractor" />
</property>
</bean>
</property>
</bean>
In the code above, the processor is delegating its work to an SP ItemReader. But the reader is unable to initialize properly before the read()
method gets called. Is it possible or recommended to call an ItemReader
inside ItemProcessor
?