1
votes

I have a Spring Batch job which retrieves files from a remote Linux server via SFTP. The directory on the remote server is an archive of seven days worth of files (~400 files). The files are relatively small in size.

Spring Batch knows which files have already been processed.

When I launch the app. the first time, a Spring Batch tasklet retrieves the files then, Spring Batch generates an exception for each file it has already processed:

E.g.

Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={input.file.url=file:///blahblahblah.txt}.

This causes huge delays in processing.

After the first time, on subsequent sftp retrievals of the files there's no exceptions and consequently no delays (perhaps Spring Batch has generated an internal hash list of files already processed).

  1. In the Transformer class, should I check to see if the file exists locally and only invoke JobLaunchRequest()s on the new files that have not yet been processed?

    /**

    • Transform BAI file to a Spring Batch job launch request */ public class FileMessageToJobRequestTransformer { public static final Logger LOGGER = LoggerFactory.getLogger(FileMessageToJobRequestTransformer.class); private Job job;

      private String fileParameterName;

      public void setJob(Job job) { this.job = job; }

      public void setFileParameterName(String fileParameterName) { LOGGER.debug("file parameter name: {}", fileParameterName); this.fileParameterName = fileParameterName; }

      @Transformer public JobLaunchRequest transform(Message message) { LOGGER.debug("File message: {}", message); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();

      jobParametersBuilder.addString(fileParameterName,
              "file://" + message.getPayload().getAbsolutePath());
      
      LOGGER.debug("Job params: {}", jobParametersBuilder.toJobParameters());
      
      return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
      

      } }

  2. Is there a way I can catch the exception?

JobInstanceAlreadyCompleteException

  1. Should I set retry-limit="1" and skip-limit="1" in SftpBaiParserJobBridge-context.hml?

<!-- When getting a new BAI file, transform into spring batch job request --> <int:transformer id="fileMessageToJobRequestTransformer" input-channel="inboundFileChannel" output-channel="outboundJobRequestChannel" method="transform"> <bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer"> <property name="job" ref="baiParseJob"/> <property name="fileParameterName" value="input.file.url"/> </bean> <int:poller fixed-rate="10000"/> </int:transformer>

appending addDate() has allowed files to be processed multiple times. There are now dups in the database.

jobParametersBuilder.addString(fileParameterName,
                "file://" + message.getPayload().getAbsolutePath())).addDate("rundate", new Date()).toJobParameters();

enter image description here Thanks!

  1. Can I check determine whether the file has already been processed by querying the repository via the JobExplorer interface from inside '@Transformer public JobLaunchRequest transform(Message message)' and returning null if the file has already been processed?

E.g.

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}
  1. How do I catch the exception?

    Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={input.file.url=file:///home/dlaxer/dfc-bank-integration/mbi-application/bai/download/BAI_Intraday160302070054471.txt}. If you want to run this job again, change the parameters. at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:172) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) at com.sun.proxy.$Proxy113.createJobExecution(Unknown Source) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) at com.sun.proxy.$Proxy114.run(Unknown Source) at org.springframework.batch.integration.launch.JobLaunchingMessageHandler.launch(JobLaunchingMessageHandler.java:50) at org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:76) ... 35 more

2

2 Answers

1
votes

I am seeing you are passing only one param in your job launcher as a "file name". Once the job launcher has been started it will query from the repository BATCH_JOB_EXECUTION table and will check the status of last processed job. In your current job execution input parameter is the same as previous job executed and the batch status & Exit code = completed then you will get JobInstanceAlreadyCompleteException. You should try to pass always unique Parmaeters with every execution. just pass a current time as parameter and try it

JobParameters jobparam =   new JobParametersBuilder().addString(fileParameterName, "file://" + message.getPayload().getAbsolutePath())
                        .addDate("rundate", new Date()).toJobParameters();

JobExecution execution = jobLauncher.run(job, jobparam);
         catch (Exception e) {
             if ( e instanceof JobInstanceAlreadyCompleteException){
                 System.out.println("Raj*************");
             }
  • 2 Yes you can handle like this

    catch (Exception e) { if ( e instanceof JobInstanceAlreadyCompleteException){ System.out.println("Need to handle*************"); }

1
votes

3 - Should I set retry-limit="1" and skip-limit="1".
Answer it's depends on your requirement. which exception you want to ignore it and number of times mean = skip limit, and during the chuck process want to repeat same step for particular exception mean retry . We can understand better with example. if i want to read some flat file and there is a chance the file can have bad input record which can cause flatFileException i want to ignore it and process my file smoothly then my config would look like this. I can skip max 20 records during the job execution and for any records if batch received exception then it should try at least 2 times.

 <chunk reader="flatFileItemReader" writer="itemWriter"
             commit-interval="1" skip-limit="20" retry-limit="2">
         <skippable-exception-classes>
            <include class="org.springframework.batch.item.file.FlatFileParseException"/>
         </skippable-exception-classes>