3
votes

For a Spring batch job, we have 2 different queries on the same table. The requirement is to have a reader that execute two queries to read data from the same table.

One way could be :

<batch:step id="firstStep" next="secondStep">
       <batch:tasklet>
          <batch:chunk reader="firstReader" writer="firstWriter" commit-        interval="2">
          </batch:chunk>
       </batch:tasklet>
    </batch:step>
    <batch:step id="secondStep" next="thirdStep">
       <batch:tasklet>
          <batch:chunk reader="secondReader" writer="secondWriter"
           commit-interval="2">
          </batch:chunk>
       </batch:tasklet>
    </batch:step>

But this demands totally another step to be defined which is a copy of the first. Is there any other way to achieve the same ? I am looking for something like MultiResourceItemReader for DB based readers that aggregates the data together.

2
Recently I had quite the same requirement of executing a reader multiple times for different queries. Does your solution require a xml configuration? If you can use a java configuration I can show you an example how to execute the same reader for different queries. - Sander_M
Yes. my application is widely based on xml config. However, can you share the java config as well - skpraveen
MultiResourceItemReader sounds good to me, you can refer to mkyong site (thought they have done this for reading multiple files, you can make it for database different queries.) mkyong.com/spring-batch/… - surya
How do you want the files to be aggregated? Do you need them to just be appended? - Sander_M
@surya : Can you provide an example how could MultiResourceItemReader be used for DB based readers? - skpraveen

2 Answers

3
votes

You can create one view in database for different queries and call it as you call in a JdbcPagingItemReader .If thats not an option then there are different ways , but one way i have worked is as given below.Spring has other option as well, but as per developer stand point following is definitely an option.

Create two item reader ...first one is below

<!--use org.springframework.batch.item.database.JdbcCursorItemReader for  simple queries-->
<bean id="itemReader1"
    class="org.springframework.batch.item.database.JdbcPagingItemReader"
 <property name="sql"
    value=" FROM   table1" />
    .......
    <property name="rowMapper">
        <bean class="com.sjena.AccountApplicationMapper" />
    </property>
</bean>

then another reader from table 2

<bean id="itemReader2"
    class="org.springframework.batch.item.database.JdbcCursorItemReader"
<property name="sql"
    value="FROM   table2" />
    .......
    <property name="rowMapper">
        <bean class="com.sjena.AccountApplicationMapper" />
    </property>
</bean>

then delegate to your custom reader

<bean id="customItemReader" class="com.sjena.spring.reader.MyCustomReader"
    scope="step">
    <property name="itemReader1" ref="itemReader1" />
    <property name="itemReader2" ref="itemReader2" />
    <property name="pageSize" value="5" />

</bean>

And eventually use this custom reader

<job id="testJob" xmlns="http://www.springframework.org/schema/batch">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter"
                commit-interval="1" />
        </tasklet>
    </step>
</job>

Then your class is as given below

public class MyCustomReader implements ItemReader<AccountApplicationSummary> {

int pagesize;// you may have diff pagesize for diff item readers
ItemReader<AccountApplication>  itemReader1;
ItemReader<AccountApplication>  itemReader2;


@Override
public AccountApplicationSummary read()
        throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

    // itemReader1.setPageSize(pageSize),Be sure, itemReader is   JdbcPagingItemReader type and better to do these initiatlization in a init method (implement InitializingBean and use afterpropertyset to set them..).. 
    //Like pageSize, you can set anyproperty that you may need

    AccountApplication application1 = itemReader1.read();
    AccountApplication application2 = itemReader2.read();
    //And you have results from both tables and now you can play with it 

    AccountApplicationSummary summary = new AccountApplicationSummary();

    return summary;
}

}

0
votes

This answer is an adaption of the answer provided by Hansjoerg on my similar question with regard to executing a step multiple times: Spring Batch - Looping a reader/processor/writer step

package hello;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jdbc.core.BeanPropertyRowMapper;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    List<String> queries = Arrays.asList("some query1, "some query2");

    @Bean
    public Job multiQueryJob() {

        List<Step> steps = queries.stream().map(query -> createStep(query)).collect(Collectors.toList());       

        return jobBuilderFactory.get("multiQueryJob")
                .start(createParallelFlow(steps))
                .end()
                .build();
    }

    private Step createStep(String query) {

        return stepBuilderFactory.get("convertStepFor" + query)
                .chunk(10)
                .reader(createQueryReader(query))
                .writer(dummyWriter())
                .build();
    }

    private Flow createParallelFlow(List<Step> steps) {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(1); // force sequential execution

        List<Flow> flows = steps.stream()

                .map(step -> new FlowBuilder<Flow>("flow_" + step.getName())
                        .start(step) 
                        .build()) 
                .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
                .split(taskExecutor)
                .add(flows.toArray(new Flow[flows.size()])).build();
    }

    public JdbcCursorItemReader<Actor> createQueryReader(String query) {
        JdbcCursorItemReader<Actor> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource());
        reader.setSql(query);
        reader.setRowMapper(mapper());
        return reader;
    }

    public BeanPropertyRowMapper<Actor> mapper(){
        BeanPropertyRowMapper<Actor> mapper = new BeanPropertyRowMapper<>();
        mapper.setMappedClass(Actor.class);
        return mapper;
    }

    public DummyItemWriter dummyWriter() {
        return new DummyItemWriter();
    }

    public DataSource dataSource() {
        final SimpleDriverDataSource dataSource = new SimpleDriverDataSource();
        try {
            dataSource.setDriver(new com.mysql.jdbc.Driver());
        } catch (SQLException e) {
            e.printStackTrace();
        }
        dataSource.setUrl("jdbc:mysql://localhost:3306/SAKILA");
        dataSource.setUsername("sa");
        dataSource.setPassword("password");
        return dataSource;
    }

}

I provided two dummy queries in the list of queries, you have to provide the actual queries. The job will be constructed based on the amount of queries and in this example I used Spring Batch JdbcCursorItemReader to read data from a database.

You can recreate this configuration by using the example provided by Spring https://spring.io/guides/gs/batch-processing/ and adding a Actor POJO and last but not least, remove the classes you don't need (you will only need the BatchConfiguration and the Application classes).