0
votes

I'm using spring batch to perform some calcul, in a reader I have to get a large data to be treated in a processor / writer, and this process takes a lot of (RAM). So I tried to split the step using the partitioner like below :

<batch:step id="MyStep.master" >
    <partition step="MyStep" partitioner="MyPartitioner">
        <handler grid-size="1" task-executor="TaskExecutor" />
    </partition>
</batch:step>

<batch:step id="MyStep" >
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="MyReader" processor="MyProcessor"
            writer="MyWriter" commit-interval="1000" skip-limit="1000">
            <batch:skippable-exception-classes>
                <batch:include class="...FunctionalException" />
            </batch:skippable-exception-classes>
        </batch:chunk>
    </batch:tasklet>
</batch:step>

<bean id="MyPartitioner" class="...MyPartitioner" scope="step"/>

<bean id="TaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >

<bean name="MyReader"
    class="org.springframework.batch.item.database.JdbcCursorItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="sql">
        <value>
            <![CDATA[
                SELECT...                   
            ]]>
        </value>
    </property>
    <property name="rowMapper" ref="MyRowMapper" />
</bean>

<bean id="MyRowMapper" class="...MyRowMapper" />

<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close">
    <property name="driverClass" value="org.postgresql.Driver"/>
    <property name="jdbcUrl" value="jdbc:postgresql://${database.host}/${database.name}"/>
    <property name="user" value="${database.user}"/>
    <property name="password" value="${database.password}"/>        
    <property name="acquireIncrement" value="1" />
    <property name="autoCommitOnClose" value="true" />
    <property name="minPoolSize" value="${min.pool.size}" /> <!-- min.pool.size=5  -->
    <property name="maxPoolSize" value="${max.pool.size}" /> <!-- max.pool.size=15  -->
</bean>

But in vain the partitioning takes a lot of memory too, because the steps (slaves) are executed in parallel, what I want to do is to split the step and execute the thread successively (not in parallel) to reduce the memory usage (RAM), is that possible?

1
What is taking up the RAM? Adding partitioning to a chunk oriented step that is running out of RAM isn't going to help (it's the same thing). - Michael Minella
What I want to do, is to split the query result that used in the reader, to reduce the memory usage, how can I perform that ? What I have now : reader with large data R -> processor P -> writer W What I want : split the result R to r1, r2, r3... reader r1 -> processor P -> writer W reader r2 -> processor P -> writer W reader r3 -> processor P -> writer W .... - Mohammed Réda OUASSINI
What ItemReader are you using? If you're using one of the Spring Batch ones, you shouldn't be loading all of the results of a query in memory at once. Post the configuration for your reader and we can go from there. - Michael Minella
I added the reader in the code above. - Mohammed Réda OUASSINI
If your cursor is loading the full result set into memory, you have your connection configured incorrectly. What db are you going against? - Michael Minella

1 Answers

0
votes

The question it's a bit old, so I'm not sure if this will be helpful now, probably you solved it yourself.

If you don't have a problem with the row execution order the solution would be to query your db in your partitioner bean, then pass to each partitions all the information to split in portions your table/s (start_key, end_key) this will reduce the ram usage (A LOT).

Some warnings:

  1. Be aware that both the query used by partitioner bean and the reader must have the same "order by"
  2. partitioned readers must be scope="step"
  3. Don't use this method if you need to process the rows in a precise order
  4. To tune the RAM try different combination of gridSize and taskExecutor maxCoreSize (that's why my gridSize is a jobParams)

Here an example:

XML Configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">

    <!-- JOB -->
    <batch:job id="printPdf" job-repository="jobRepository"
        restartable="false">

        <batch:step id="MyStep">
            <batch:partition step="MyStep.template"
                partitioner="myPartitioner" handler="partitionHandler">
            </batch:partition>
        </batch:step>

    </batch:job>

    <!-- Partitioner -->
    <bean id="myPartitioner" class="foo.MyPartitioner"
        scope="step">
        <property name="jdbcTemplate" ref="myJdbcTemplate" />
        <property name="sql"
            value="Select ...." />
        <property name="rowMap">
            <bean
                class="foo.MyPartitionHandlerRowMapper" />
        </property>
        <property name="preparedStatementSetter">
            <bean
                class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
                <property name="parameters">
                    <list>
                        <value>#{jobParameters['param1']}</value>
                    </list>
                </property>
            </bean>
        </property>
    </bean>

    <bean id="partitionHandler" scope="step"
        class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
        <property name="taskExecutor" ref="customTaskExecutor" />
        <property name="gridSize" value="#{jobParameters['gridSize']}" />
        <property name="step" ref="MyStep.template" />
    </bean>

    <bean id="customTaskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="8" />
        <property name="maxPoolSize" value="8" />
        <property name="waitForTasksToCompleteOnShutdown" value="true" />
        <property name="awaitTerminationSeconds" value="120" />
    </bean>

    <batch:step id="MyStep.tempate">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk commit-interval="2500" reader="myReader"
                processor="myProcessor" writer="myWriter" skip-limit="2500">
            <batch:skippable-exception-classes>
                <batch:include class="...FunctionalException" />
            </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>

    <!-- Beans -->

    <!-- Processors -->
    <bean id="myProcessor" class="foo.MyProcessor"
        scope="step">
    </bean>

    <bean id="classLoaderVerifier"
        class="it.addvalue.pkjwd.services.genbean.GenericStockKeysForNoDuplicate" />

    <!-- Readers -->
    <bean id="myReader"
        class="org.springframework.batch.item.database.JdbcCursorItemReader"
        scope="step">
        <property name="dataSource" ref="myDataSouce" />
        <property name="sql"
            value="select ... from ... where ID >= ? and ID <= ?" />
        <property name="rowMapper">
            <bean class="foo.MyReaderPartitionedRowMapper" />
        </property>
        <property name="preparedStatementSetter">
            <bean
                class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
                <property name="parameters">
                    <list>
                        <value>#{stepExecutionContext['START_ID']}</value>
                        <value>#{stepExecutionContext['END_ID']}</value>
                    </list>
                </property>
            </bean>
        </property>
    </bean>

    <!-- Writers -->
    <bean id="myWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="assertUpdates" value="false" />
        <property name="itemPreparedStatementSetter">
            <bean class="foo.MyWriterStatementSetters" />
        </property>
        <property name="sql"
            value="insert ..." />
        <property name="dataSource" ref="myDataSouce" />
    </bean>

</beans>

Your Partitioner Bean will look like this:

package foo;

import foo.model.MyTable;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.core.RowMapper;

public class MyPartitioner implements Partitioner
{
    private JdbcTemplate                     jdbcTemplate;

    private RowMapper<foo.model.MyTable> rowMap;

    private String                           sql;

    private PreparedStatementSetter          preparedStatementSetter;

    public JdbcTemplate getJdbcTemplate()
    {
        return jdbcTemplate;
    }

    public void setJdbcTemplate(JdbcTemplate jdbcTemplate)
    {
        this.jdbcTemplate = jdbcTemplate;
    }

    public RowMapper<foo.model.MyTable> getRowMap()
    {
        return rowMap;
    }

    public void setRowMap(RowMapper<PkjwdPolizzePartition> rowMap)
    {
        this.rowMap = rowMap;
    }

    public String getSql()
    {
        return sql;
    }

    public void setSql(String sql)
    {
        this.sql = sql;
    }

    public PreparedStatementSetter getPreparedStatementSetter()
    {
        return preparedStatementSetter;
    }

    public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter)
    {
        this.preparedStatementSetter = preparedStatementSetter;
    }

    @Override
    public Map<String, ExecutionContext> partition(int gridSize)
    {
        Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();

        try
        {
            List<PkjwdPolizzePartition> lstMyRows = jdbcTemplate.query(sql, preparedStatementSetter ,rowMap);

            if ( lstMyRows.size() > 0 )
            {
                int total = lstMyRows.size();
                int rowsPerPartition = total / gridSize;
                int leftovers = total % gridSize;
                total = lstMyRows.size() - 1;

                int startPos = 0;
                int endPos = rowsPerPartition - 1;

                int i = 0;

                while (endPos <= (total))
                {

                    ExecutionContext context = new ExecutionContext();

                    if ( endPos + leftovers == total )
                    {
                        endPos = total;
                    }
                    else if ( endPos >= (total) )
                    {
                        endPos = total;
                    }

                    context.put("START_ID", lstMyRows.get(startPos).getId());
                    context.put("END_ID", lstMyRows.get(endPos).getId());



                    map.put("PART_" + StringUtils.leftPad("" + i, ("" + gridSize).length(), '0'), context);

                    i++;
                    startPos = endPos + 1;
                    endPos = endPos + rowsPerPartition;
                }
            }

        }
        catch ( Exception e )
        {
            e.printStackTrace();
        }

        return map;
    }

}