I am trying to pull remote files through SFTP Spring Integration and dump into my local drive, and kick-off a Spring Batch Job (Step-1 to read, process & write), followed by Step-2 where a Tasklet archives the local file downloaded to another location locally and deletes the downloaded file.
Assuming the files being downloaded is "data.service", "data.maintenance" and "data.transaction". I am only processing "data.service" but all 3 are archived and deleted.
I want the above steps to happen at a specific interval assuming that a new set of files will be overwritten at the remote location.
Bean def
<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
<bean id="acceptOnceFileListFilter" class="org.springframework.integration.file.filters.AcceptOnceFileListFilter"/>
conf
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
preserve-timestamp="true"
filename-pattern="*.*"
local-filter="acceptAllFileListFilter" >
<int:poller max-messages-per-poll="-1" fixed-rate="60000" />
</int-sftp:inbound-channel-adapter>
What is happening now is... The above triggers this sequence and runs in an unending loop, without fetching the new files from remote. Details as below:
I start with an empty local folder, and remote server has all 3 files [Start the program]
- Spring Integration program downloads all 3 files to local location
- Spring Batch Job Step-1 gets triggered Tasklet (Step-2) archives the
- local files Tasklet (Step-2) deletes the local files Spring
- Integration program downloads all 3 files to local location (why???)
- program keeps running Step-1 to Step-5 in an unending loop
- New file dropped on the remote server, same (1) to (5) no job triggered
Tried replacing as below, but the program just runs once, i.e., from (1) to (5) and nothing happens
local-filter="acceptOnceFileListFilter" >
Expected
- Spring Integration program downloads all 3 files to local location
- Spring Batch Job Step-1 gets triggered
- Tasklet (Step-2) archives the local files
- Tasklet (Step-2) deletes the local files
- Spring Integration program waits for new files to be dropped in the remote location (polls every 60000 ms)
- New file dropped on the remote server
- Start with (1) through (6)
Per suggestion on a different post I have a unique job parameter as below:
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
jobParametersBuilder.addLong("currentTime", new Long(System.currentTimeMillis()));
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
Tried with different combinations of filter, none served my purpose. Any sample would be of great help, struggling with this since last week.
Here's my change per the suggestions:
Adding a composite filter as below, but what would be the argument that will be passed to SftpPersistentAcceptOnceFileListFilter? I could not see any samples in the internet.
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter"/>
<bean class="org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter">
</bean>
</list>
</constructor-arg>
</bean>
Including in the adapter as below:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
filter="compositeFilter">
<int:poller max-messages-per-poll="-1" fixed-rate="600000" />
</int-sftp:inbound-channel-adapter>
Update - Changes after adding persistent filter:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="file:${inbound.remote.directory}"
auto-create-local-directory="false"
delete-remote-files="false"
filter="compositeFilter"
>
<int:poller max-messages-per-poll="-1" fixed-rate="60000" />
</int-sftp:inbound-channel-adapter>
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg name="store" ref="metadataStore"/>
<constructor-arg value="*.*"/>
</bean>
<bean class="org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter">
<constructor-arg name="store" ref="metadataStore"/>
<constructor-arg value="*.*"/>
</bean>
</list>
</constructor-arg>
</bean>
<bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean name="metadataStore" class="org.springframework.integration.redis.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
This gives me the exception as below. I tried the suggestions in the Spring forum, but I don't have a connection issues.
2014-07-17 22:27:14,139 [task-scheduler-1] INFO com.jcraft.jsch - Authentication succeeded (keyboard-interactive). 2014-07-17 22:27:14,192 [task-scheduler-1] INFO com.jcraft.jsch - Disconnecting from localhost port 22 2014-07-17 22:27:14,195 [task-scheduler-1] DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory - Returning cached instance of singleton bean 'errorChannel' 2014-07-17 22:27:14,197 [task-scheduler-1] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'errorChannel', message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=84cccecf-20ec-f67b-3f74-a938bf9abf3d, timestamp=1405661234197}] 2014-07-17 22:27:14,197 [task-scheduler-1] DEBUG org.springframework.integration.handler.LoggingHandler - (inner bean)#22 received message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=84cccecf-20ec-f67b-3f74-a938bf9abf3d, timestamp=1405661234197}] 2014-07-17 22:27:14,199 [task-scheduler-1] ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:193) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:167) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:124) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:187) at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:52) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:278) at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:272) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) Caused by: org.springframework.messaging.MessagingException: Failed to execute on session at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:311) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:167) ... 21 more Caused by: java.lang.ClassCastException: com.jcraft.jsch.ChannelSftp$LsEntry cannot be cast to java.io.File at org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter.fileName(FileSystemPersistentAcceptOnceFileListFilter.java:28) at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.buildKey(AbstractPersistentAcceptOnceFileListFilter.java:88) at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.accept(AbstractPersistentAcceptOnceFileListFilter.java:48) at org.springframework.integration.file.filters.AbstractFileListFilter.filterFiles(AbstractFileListFilter.java:40) at org.springframework.integration.file.filters.CompositeFileListFilter.filterFiles(CompositeFileListFilter.java:97) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.filterFiles(AbstractInboundFileSynchronizer.java:157) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:173) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:167) at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:302) ... 22 more
2014-07-17 22:27:14,199 [task-scheduler-1] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=84cccecf-20ec-f67b-3f74-a938bf9abf3d, timestamp=1405661234197}]
Update post removing FileSystemPersistentAcceptOnceFileListFilter
filter leaving the 'SftpPersistentAcceptOnceFileListFilter'
2014-07-18 09:29:21,942 [task-scheduler-3] INFO com.jcraft.jsch - Authentications that can continue: publickey,keyboard-interactive,password 2014-07-18 09:29:21,942 [task-scheduler-3] INFO com.jcraft.jsch - Next authentication method: publickey 2014-07-18 09:29:21,942 [task-scheduler-3] INFO com.jcraft.jsch - Authentications that can continue: keyboard-interactive,password 2014-07-18 09:29:21,942 [task-scheduler-3] INFO com.jcraft.jsch - Next authentication method: keyboard-interactive 2014-07-18 09:29:22,022 [task-scheduler-3] INFO com.jcraft.jsch - Authentication succeeded (keyboard-interactive). 2014-07-18 09:29:22,067 [task-scheduler-3] DEBUG org.springframework.data.redis.core.RedisConnectionUtils - Opening RedisConnection 2014-07-18 09:29:22,068 [task-scheduler-3] INFO com.jcraft.jsch - Disconnecting from localhost port 22 2014-07-18 09:29:22,068 [task-scheduler-3] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend on channel 'errorChannel', message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=55304e38-6f82-8350-e763-0f5bcb507788, timestamp=1405700962068}] 2014-07-18 09:29:22,068 [Connect thread localhost session] INFO com.jcraft.jsch - Caught an exception, leaving main loop due to Socket closed 2014-07-18 09:29:22,068 [task-scheduler-3] DEBUG org.springframework.integration.handler.LoggingHandler - (inner bean)#22 received message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=55304e38-6f82-8350-e763-0f5bcb507788, timestamp=1405700962068}] 2014-07-18 09:29:22,069 [task-scheduler-3] ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:193) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.receive(AbstractInboundFileSynchronizingMessageSource.java:167) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:124) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:187) at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:52) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:278) at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:272) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) Caused by: org.springframework.messaging.MessagingException: Failed to execute on session at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:311) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:167) ... 21 more Caused by: org.springframework.data.redis.RedisConnectionFailureException: Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:97) at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:143) at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:41) at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:128) at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:91) at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:78) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:152) at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:84) at org.springframework.data.redis.core.DefaultHashOperations.putIfAbsent(DefaultHashOperations.java:179) at org.springframework.data.redis.core.DefaultBoundHashOperations.putIfAbsent(DefaultBoundHashOperations.java:91) at org.springframework.data.redis.support.collections.RedisProperties.putIfAbsent(RedisProperties.java:228) at org.springframework.integration.redis.metadata.RedisMetadataStore.putIfAbsent(RedisMetadataStore.java:139) at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.accept(AbstractPersistentAcceptOnceFileListFilter.java:51) at org.springframework.integration.file.filters.AbstractFileListFilter.filterFiles(AbstractFileListFilter.java:40) at org.springframework.integration.file.filters.CompositeFileListFilter.filterFiles(CompositeFileListFilter.java:97) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.filterFiles(AbstractInboundFileSynchronizer.java:157) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:173) at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer$1.doInSession(AbstractInboundFileSynchronizer.java:167) at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:302) ... 22 more Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at redis.clients.util.Pool.getResource(Pool.java:42) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:84) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:10) at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:90) ... 41 more Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused at redis.clients.jedis.Connection.connect(Connection.java:150) at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:71) at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1783) at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:65) at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:819) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:429) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:360) at redis.clients.util.Pool.getResource(Pool.java:40) ... 44 more Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:382) at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:241) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:228) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:431) at java.net.Socket.connect(Socket.java:527) at redis.clients.jedis.Connection.connect(Connection.java:144) ... 51 more
2014-07-18 09:29:22,069 [task-scheduler-3] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: [Payload MessagingException content=org.springframework.messaging.MessagingException: Problem occurred while synchronizing remote to local directory][Headers={id=55304e38-6f82-8350-e763-0f5bcb507788, timestamp=1405700962068}]