I have a Spring Batch job which retrieves files from a remote Linux server via SFTP. The directory on the remote server is an archive of seven days worth of files (~400 files). The files are relatively small in size.
Spring Batch knows which files have already been processed.
When I launch the app. the first time, a Spring Batch tasklet retrieves the files then, Spring Batch generates an exception for each file it has already processed:
E.g.
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={input.file.url=file:///blahblahblah.txt}.
This causes huge delays in processing.
After the first time, on subsequent sftp retrievals of the files there's no exceptions and consequently no delays (perhaps Spring Batch has generated an internal hash list of files already processed).
In the Transformer class, should I check to see if the file exists locally and only invoke JobLaunchRequest()s on the new files that have not yet been processed?
/**
Transform BAI file to a Spring Batch job launch request */ public class FileMessageToJobRequestTransformer { public static final Logger LOGGER = LoggerFactory.getLogger(FileMessageToJobRequestTransformer.class); private Job job;
private String fileParameterName;
public void setJob(Job job) { this.job = job; }
public void setFileParameterName(String fileParameterName) { LOGGER.debug("file parameter name: {}", fileParameterName); this.fileParameterName = fileParameterName; }
@Transformer public JobLaunchRequest transform(Message message) { LOGGER.debug("File message: {}", message); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName, "file://" + message.getPayload().getAbsolutePath()); LOGGER.debug("Job params: {}", jobParametersBuilder.toJobParameters()); return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
} }
Is there a way I can catch the exception?
JobInstanceAlreadyCompleteException
- Should I set retry-limit="1" and skip-limit="1" in SftpBaiParserJobBridge-context.hml?
<!-- When getting a new BAI file, transform into spring batch job request -->
<int:transformer id="fileMessageToJobRequestTransformer"
input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel"
method="transform">
<bean class="com.distributedfinance.mbi.bai.transformer.FileMessageToJobRequestTransformer">
<property name="job" ref="baiParseJob"/>
<property name="fileParameterName" value="input.file.url"/>
</bean>
<int:poller fixed-rate="10000"/>
</int:transformer>
appending addDate() has allowed files to be processed multiple times. There are now dups in the database.
jobParametersBuilder.addString(fileParameterName,
"file://" + message.getPayload().getAbsolutePath())).addDate("rundate", new Date()).toJobParameters();
- Can I check determine whether the file has already been processed by querying the repository via the JobExplorer interface from inside '@Transformer public JobLaunchRequest transform(Message message)' and returning null if the file has already been processed?
E.g.
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
How do I catch the exception?
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={input.file.url=file:///home/dlaxer/dfc-bank-integration/mbi-application/bai/download/BAI_Intraday160302070054471.txt}. If you want to run this job again, change the parameters. at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:126) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:172) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) at com.sun.proxy.$Proxy113.createJobExecution(Unknown Source) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:125) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:302) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) at com.sun.proxy.$Proxy114.run(Unknown Source) at org.springframework.batch.integration.launch.JobLaunchingMessageHandler.launch(JobLaunchingMessageHandler.java:50) at org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:76) ... 35 more