I am implementing spring batch job for processing millions of records in a DB table using partition approach as follows -
Fetch a unique partitioning codes from table in a partitioner and set the same in execution context.
Create a chunk step with reader,processor and writer to process records based on particular partition code.
Is this approach is proper or is there any better approach for situation like this? As some partition codes can have more number of records than others,so those with more records might take more time to process than the ones with less records.
Is it possible to create partition/thread to process like thread1 process 1-1000,thread2 process 1001-2000 etc ?
How do I control number of threads getting created as partition codes can be around 100, I would like to create only 20 thread and process in 5 iteration?
What happens if one partition fails, will all processing stop and reverted back?
Following are configurations -
<bean id="MyPartitioner" class="com.MyPartitioner" />
<bean id="itemProcessor" class="com.MyProcessor" scope="step" />
<bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step" >
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select * from mytable WHERE code = '#{stepExecutionContext[code]}' "/>
<property name="rowMapper">
<bean class="com.MyRowMapper" scope="step"/>
</property>
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
<property name="corePoolSize" value="20"/>
<property name="maxPoolSize" value="20"/>
<property name="allowCoreThreadTimeOut" value="true"/>
</bean>
<batch:step id="Step1" xmlns="http://www.springframework.org/schema/batch">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="200"/>
</batch:tasklet>
</batch:step>
<batch:job id="myjob">
<batch:step id="mystep">
<batch:partition step="Step1" partitioner="MyPartitioner">
<batch:handler grid-size="20" task-executor="taskExecutor"/>
</batch:partition>
</batch:step>
</batch:job>
Partitioner -
public class MyPartitioner implements Partitioner{
@Override
public Map<String, ExecutionContext> partition(int gridSize)
{
Map<String, ExecutionContext> partitionMap = new HashMap<String, ExecutionContext>();
List<String> codes = getCodes();
for (String code : codes)
{
ExecutionContext context = new ExecutionContext();
context.put("code", code);
partitionMap.put(code, context);
}
return partitionMap;}}
Thanks