The question it's a bit old, so I'm not sure if this will be helpful now, probably you solved it yourself.
If you don't have a problem with the row execution order the solution would be to query your db in your partitioner bean, then pass to each partitions all the information to split in portions your table/s (start_key, end_key) this will reduce the ram usage (A LOT).
Some warnings:
- Be aware that both the query used by partitioner bean and the reader must have the same "order by"
- partitioned readers must be scope="step"
- Don't use this method if you need to process the rows in a precise order
- To tune the RAM try different combination of gridSize and taskExecutor maxCoreSize (that's why my gridSize is a jobParams)
Here an example:
XML Configuration:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">
<!-- JOB -->
<batch:job id="printPdf" job-repository="jobRepository"
restartable="false">
<batch:step id="MyStep">
<batch:partition step="MyStep.template"
partitioner="myPartitioner" handler="partitionHandler">
</batch:partition>
</batch:step>
</batch:job>
<!-- Partitioner -->
<bean id="myPartitioner" class="foo.MyPartitioner"
scope="step">
<property name="jdbcTemplate" ref="myJdbcTemplate" />
<property name="sql"
value="Select ...." />
<property name="rowMap">
<bean
class="foo.MyPartitionHandlerRowMapper" />
</property>
<property name="preparedStatementSetter">
<bean
class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<value>#{jobParameters['param1']}</value>
</list>
</property>
</bean>
</property>
</bean>
<bean id="partitionHandler" scope="step"
class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="customTaskExecutor" />
<property name="gridSize" value="#{jobParameters['gridSize']}" />
<property name="step" ref="MyStep.template" />
</bean>
<bean id="customTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="8" />
<property name="maxPoolSize" value="8" />
<property name="waitForTasksToCompleteOnShutdown" value="true" />
<property name="awaitTerminationSeconds" value="120" />
</bean>
<batch:step id="MyStep.tempate">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk commit-interval="2500" reader="myReader"
processor="myProcessor" writer="myWriter" skip-limit="2500">
<batch:skippable-exception-classes>
<batch:include class="...FunctionalException" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
<!-- Beans -->
<!-- Processors -->
<bean id="myProcessor" class="foo.MyProcessor"
scope="step">
</bean>
<bean id="classLoaderVerifier"
class="it.addvalue.pkjwd.services.genbean.GenericStockKeysForNoDuplicate" />
<!-- Readers -->
<bean id="myReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader"
scope="step">
<property name="dataSource" ref="myDataSouce" />
<property name="sql"
value="select ... from ... where ID >= ? and ID <= ?" />
<property name="rowMapper">
<bean class="foo.MyReaderPartitionedRowMapper" />
</property>
<property name="preparedStatementSetter">
<bean
class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<value>#{stepExecutionContext['START_ID']}</value>
<value>#{stepExecutionContext['END_ID']}</value>
</list>
</property>
</bean>
</property>
</bean>
<!-- Writers -->
<bean id="myWriter"
class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="assertUpdates" value="false" />
<property name="itemPreparedStatementSetter">
<bean class="foo.MyWriterStatementSetters" />
</property>
<property name="sql"
value="insert ..." />
<property name="dataSource" ref="myDataSouce" />
</bean>
</beans>
Your Partitioner Bean will look like this:
package foo;
import foo.model.MyTable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.core.RowMapper;
public class MyPartitioner implements Partitioner
{
private JdbcTemplate jdbcTemplate;
private RowMapper<foo.model.MyTable> rowMap;
private String sql;
private PreparedStatementSetter preparedStatementSetter;
public JdbcTemplate getJdbcTemplate()
{
return jdbcTemplate;
}
public void setJdbcTemplate(JdbcTemplate jdbcTemplate)
{
this.jdbcTemplate = jdbcTemplate;
}
public RowMapper<foo.model.MyTable> getRowMap()
{
return rowMap;
}
public void setRowMap(RowMapper<PkjwdPolizzePartition> rowMap)
{
this.rowMap = rowMap;
}
public String getSql()
{
return sql;
}
public void setSql(String sql)
{
this.sql = sql;
}
public PreparedStatementSetter getPreparedStatementSetter()
{
return preparedStatementSetter;
}
public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter)
{
this.preparedStatementSetter = preparedStatementSetter;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();
try
{
List<PkjwdPolizzePartition> lstMyRows = jdbcTemplate.query(sql, preparedStatementSetter ,rowMap);
if ( lstMyRows.size() > 0 )
{
int total = lstMyRows.size();
int rowsPerPartition = total / gridSize;
int leftovers = total % gridSize;
total = lstMyRows.size() - 1;
int startPos = 0;
int endPos = rowsPerPartition - 1;
int i = 0;
while (endPos <= (total))
{
ExecutionContext context = new ExecutionContext();
if ( endPos + leftovers == total )
{
endPos = total;
}
else if ( endPos >= (total) )
{
endPos = total;
}
context.put("START_ID", lstMyRows.get(startPos).getId());
context.put("END_ID", lstMyRows.get(endPos).getId());
map.put("PART_" + StringUtils.leftPad("" + i, ("" + gridSize).length(), '0'), context);
i++;
startPos = endPos + 1;
endPos = endPos + rowsPerPartition;
}
}
}
catch ( Exception e )
{
e.printStackTrace();
}
return map;
}
}
ItemReader
are you using? If you're using one of the Spring Batch ones, you shouldn't be loading all of the results of a query in memory at once. Post the configuration for your reader and we can go from there. - Michael Minella