3
votes

Requirement - Pull data from different views on Database1(simple select) and insert into tables on Database2 using Spring Batch.

Note : Each step not dependent on other steps - so can run in parallel, not sequential.

Solution : Use Spring Batch's Split Slows http://docs.spring.io/spring-batch/reference/html/configureStep.html#split-flows

Batch Config-

           <batch:job id="hrwardaily-batch-ps-hrit" xmlns="http://www.springframework.org/schema/batch">
           <batch:split id="splitFlow1" task-executor="taskExecutor">

                          <batch:flow>
                          <!-- STEP 1: READ BusinessUnit DATA IN CHUNKS AND WRITE IT -->
                          <step id="readAndWriteBusinessUnitPsToHrwar" >
                                         <tasklet transaction-manager="transactionManager">
                                                        <chunk reader="psViewBusinessUnitReader" writer="hrwarBusinessUnitStagingWriter"
                                                                       commit-interval="${hrwardaily.batch.ps.hrit.commitinterval}" />
                                                        <batch:listeners>
                                                                       <batch:listener ref="listener" />
                                                        </batch:listeners>
                                         </tasklet>
                                         <batch:next on="*" to="readAndWriteCompanyPsToHrwar" />
                                         <batch:next on="FAILED" to="readAndWriteCompanyPsToHrwar" />                                         
                          </step>

                          <!-- STEP 2: READ Company DATA IN CHUNKS AND WRITE IT -->
                          <step id="readAndWriteCompanyPsToHrwar">
                                         <tasklet transaction-manager="transactionManager">
                                                        <chunk reader="psViewCompanyReader" writer="hrwarCompanyStagingWriter"
                                                                       commit-interval="${hrwardaily.batch.ps.hrit.commitinterval}" />
                                                        <batch:listeners>
                                                                       <batch:listener ref="listener" />
                                                        </batch:listeners>
                                         </tasklet>
                                         <batch:next on="*" to="readAndWriteCountryPsToHrwar" />
                                         <batch:next on="FAILED" to="readAndWriteCountryPsToHrwar" />
                          </step>

                          <!-- STEP 3: READ Country DATA IN CHUNKS AND WRITE IT -->
                          <step id="readAndWriteCountryPsToHrwar">
                                         <tasklet transaction-manager="transactionManager">
                                                        <chunk reader="psViewCountryReader" writer="hrwarCountryStagingWriter"
                                                                       commit-interval="${hrwardaily.batch.ps.hrit.commitinterval}" />
                                                        <batch:listeners>
                                                                       <batch:listener ref="listener" />
                                                        </batch:listeners>                                          
                                         </tasklet>
                                         <batch:next on="*" to="readAndWriteCurrencyPsToHrwar" />
                                         <batch:next on="FAILED" to="readAndWriteCurrencyPsToHrwar" />
                          </step>

                          <!-- STEP 4: READ Currency DATA IN CHUNKS AND WRITE IT -->
                          <step id="readAndWriteCurrencyPsToHrwar">
                                         <tasklet transaction-manager="transactionManager">
                                                        <chunk reader="psViewCurrencyReader" writer="hrwarCurrencyStagingWriter"
                                                                       commit-interval="${hrwardaily.batch.ps.hrit.commitinterval}" />
                                                        <batch:listeners>
                                                                       <batch:listener ref="listener" />
                                                        </batch:listeners>
                                         </tasklet>
                                         <!-- <batch:next on="*" to="readAndWriteDepartmentPsToHrwar" />
                                         <batch:next on="FAILED" to="readAndWriteDepartmentPsToHrwar" /> -->
                          </step>
                          </batch:flow>


                          </batch:split>

           </batch:job>

Exception :

org.springframework.batch.core.step.AbstractStep$FatalException: Fatal failure detected at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:310) at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76) at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367) at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143) at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:242) at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:198) at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:348) at org.springframework.batch.core.job.flow.FlowJob.access$0(FlowJob.java:1) at org.springframework.batch.core.job.flow.FlowJob$JobFlowExecutor.executeStep(FlowJob.java:135) at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60) at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144) at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124) at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:83) at org.springframework.batch.core.job.flow.support.state.SplitState$1.call(SplitState.java:1) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.lang.Thread.run(Thread.java:662) Caused by: org.springframework.dao.OptimisticLockingFailureException: Attempt to update step execution id=3 with wrong version (2), where current version is 1 at org.springframework.batch.core.repository.dao.MapStepExecutionDao.updateStepExecution(MapStepExecutionDao.java:86) at org.springframework.batch.core.repository.support.SimpleJobRepository.update(SimpleJobRepository.java:167) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:318) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:110) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202) at $Proxy16.update(Unknown Source) at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:307) ... 17 more [30.10.2014 11:20:35][SimpleAsyncTaskExecutor-1] ERROR: AbstractStep.execute() - Encountered an error executing the step

Appreciate if somebody can help me / guide me on resolving this issue.

1

1 Answers

1
votes

As the documentation of the MapJobRepositoryFactoryBean states (http://docs.spring.io/spring-batch/apidocs/org/springframework/batch/core/repository/support/MapJobRepositoryFactoryBean.html), it is not thread safe and therefore not suited for multithreaded jobs (including ones with splits). Switch to an in memory database like HSQLDB so that you have thread safety in the job repository.