1
votes

I am trying to read data from cassandra using spring batch, where I have implemented ItemReader, ItemProcessor, and ItemWriter. I am able to read the data , process it and write back the data to the same table. I am creating xml file to execute the job:

xml:

    <job id="LoadStatusIndicator" job-repository="jobRepository" restartable="false">
        <step id="LoadStatus" next="">
            <tasklet>
                <chunk reader="StatusReader" processor="ItemProcessor" writer="ItemWriter"
                    commit-interval="10" />
            </tasklet>
        </step>
    </job>

    <beans:bean id="ItemWriter" scope="step"
        class="com.batch.writer.ItemWriter">
    </beans:bean>

<beans:bean id="ItemProcessor" scope="step"
        class="com.batch.processor.ItemProcessor">
    </beans:bean>
    <beans:bean id="Reader" scope="step"
        class="com.reader.ItemReader">
        <beans:property name="dataSource" ref="CassandraSource" />

    </beans:bean>

applicationcontext.xml:

<beans:bean id="CassandraSource" parent="DataSourceParent">
<beans:property name="url" value="jdbc:cassandra://${cassandra.hostName}:${cassandra.port}/${cassandra.keyspace}" />
<beans:property name="driverClassName" value="org.apache.cassandra.cql.jdbc.CassandraDriver" />
</beans:bean>

reader class:

   public static final String query = "SELECT * FROM test_1 allow filtering;";

 @Override
    public List<Item> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException 
    {
    List<Item> results = new ArrayList<Item>();
try {
results = cassandraTemplate.select(query,Item.class);
    } catch (Exception e) {
    e.printStackTrace();
    }
return results;
    }

writer classs:

@Override
public void write(List<? extends Item> item) throws Exception {
    try {
    cassandraTemplate.insert(item);
}catch(Exception e){e.printStackTrace();}

But the problem is the whole job is getting executed multiple times , infact it is not stopping at all. I have to force stop the job execution. I have only 2 rows in the table. I think it is because of the commit-interval defined in xml, but having commit-interval = 10, job executes more than 10 times

According to my understanding, when I run the xml file that means I am running the job only one time, it calls the reader once keeps the data in the run time memory (job repository), calls item processor once (I use list ) and the whole list is inserted at once

1
Does this also happen if you remove all the jdbc/cassandra related code?Mark Rotteveel
Yes, it doesnot stop even if I remove cassandra related codesPrabodh Hend
in my opinion the problem is with non-needed <tasklet> configuration. I think you can simply omit it.Ilya Dyoshin
I would remove the next="" as there is no next step. Also, as you append to the existing table, aren't you creating new items that your reader can read, creating an infinite loop ?Philippe
@Ilya Dyoshin. I think to chunk the step we need to have tasklet, and I need chunking because @ later point of time my data will be hugePrabodh Hend

1 Answers

2
votes

SOLVED

In reader class I wrote:

if (results.size!=0) return results; else return null;