When using RemoteFileTemplate to get 20k files from FTP, always 15 or 20 files got stuck and after 10 minutes FTP return error code:
- FTP response 421 received. Server closed connection.
- Failed to obtain InputStream for remote file /test/test_file_1245: 425
FTP server config:
- MaxInstances 2000 (Limits the overall number of connections)
- MaxClients 1000 (Limits the number of connections on a per-server/vhost basis)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
</parent>
<groupId>rabbitmq.listener</groupId>
<artifactId>listener</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
</dependency>
</dependencies>
</project>
application.properties
spring.rabbitmq.listener.type=direct
spring.rabbitmq.listener.direct.default-requeue-rejected=false
spring.rabbitmq.listener.direct.consumers-per-queue=5
spring.rabbitmq.listener.direct.prefetch=5
DemoConfiguration.java
@Configuration
@Slf4j
@EnableRabbit
public class DemoConfiguration {
@Bean
public DirectMessageListenerContainer container(final DirectRabbitListenerContainerFactory containerFactory) {
final DirectMessageListenerContainer listenerContainer = containerFactory.createListenerContainer();
listenerContainer.setQueueNames("in_queue"); // has 20.000 messages before starting this application
listenerContainer.setListenerId("listener_in_queue");
return listenerContainer;
}
@Bean
public SessionFactory<FTPFile> sessionFactory() {
final DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
defaultFtpSessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
defaultFtpSessionFactory.setHost("ftp.test.com");
defaultFtpSessionFactory.setPort(1021);
defaultFtpSessionFactory.setUsername("test");
defaultFtpSessionFactory.setPassword("test");
return new CachingSessionFactory<>(defaultFtpSessionFactory);
}
@Bean
public RemoteFileTemplate<FTPFile> ftpFileRemoteFileTemplate(final SessionFactory<FTPFile> sessionFactory) {
return new RemoteFileTemplate<>(sessionFactory);
}
@Bean
public IntegrationFlow demoFlow(final DirectMessageListenerContainer container, final RemoteFileTemplate<FTPFile> ftpFileRemoteFileTemplate) {
return IntegrationFlows.from(Amqp.inboundAdapter(container))
.handle((payload, headers) -> {
final String file = payload.toString();
ftpFileRemoteFileTemplate.get(file, stream -> {
try {
log.info("{}: {}", file, IOUtils.toString(stream, StandardCharsets.UTF_8).length());
} finally {
stream.close();
}
});
return null;
})
.get();
}
}
log file
2019-01-28 13:37:32.208 WARN 2688 --- [pool-1-thread-14 ] org.springframework.integration.ftp.session.FtpSession : failed to disconnect FTPClient
org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received. Server closed connection.
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:388)
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:300)
at org.apache.commons.net.ftp.FTP.getReply(FTP.java:732)
at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1853)
at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:108)
at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:150)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.close(CachingSessionFactory.java:208)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:456)
at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-01-28 13:37:32.208 WARN 2688 --- [pool-1-thread-14 ] org.springframework.integration.ftp.session.FtpSession : failed to disconnect FTPClient
org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received. Server closed connection.
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:388)
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:300)
at org.apache.commons.net.ftp.FTP.getReply(FTP.java:732)
at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1853)
at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:108)
at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:150)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.close(CachingSessionFactory.java:208)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:456)
at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-01-28 13:37:32.212 WARN 2688 --- [pool-1-thread-14 ] org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:111)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
... 11 common frames omitted
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:451)
at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
... 29 common frames omitted
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:98)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:280)
at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$get$4(RemoteFileTemplate.java:396)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:442)
... 36 common frames omitted
2019-01-28 13:37:32.215 ERROR 2688 --- [pool-1-thread-14 ] org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer : Failed to invoke listener
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:111)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
... 11 common frames omitted
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:451)
at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
... 29 common frames omitted
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:98)
at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:280)
at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$get$4(RemoteFileTemplate.java:396)
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:442)
... 36 common frames omitted
-- EDIT
I replaced CachingSessionFactory
with DefaultFtpSessionFactory
to avoid pool issues and got same behavior. 10 messages were in unacked
state on RabbitMQ for 10 minutes. After 10 minutes, I could see that files were processed and there were no more messages on RabbitMQ. I could see that on ftp server one session was in IDLE status for 10 minutes.
@Bean
public SessionFactory<FTPFile> sessionFactory() {
final DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
defaultFtpSessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
defaultFtpSessionFactory.setHost("ftp.test.com");
defaultFtpSessionFactory.setPort(1021);
defaultFtpSessionFactory.setUsername("test");
defaultFtpSessionFactory.setPassword("test");
return defaultFtpSessionFactory;
}
This does not seem like pool or FTP 421 problem to me, it is most likely something in the DefaultFtpSessionFactory
or session itself? Why are these threads hanging for so long?
FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE
? – Artem Bilan421: Service not available, closing control connection. This may be a reply to any command if the service knows it must shut down.
– Gary Russell