0
votes

When using RemoteFileTemplate to get 20k files from FTP, always 15 or 20 files got stuck and after 10 minutes FTP return error code:

  • FTP response 421 received. Server closed connection.
  • Failed to obtain InputStream for remote file /test/test_file_1245: 425

FTP server config:

  • MaxInstances 2000 (Limits the overall number of connections)
  • MaxClients 1000 (Limits the number of connections on a per-server/vhost basis)

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
    </parent>

    <groupId>rabbitmq.listener</groupId>
    <artifactId>listener</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-ftp</artifactId>
        </dependency>
    </dependencies>

</project>

application.properties

spring.rabbitmq.listener.type=direct
spring.rabbitmq.listener.direct.default-requeue-rejected=false
spring.rabbitmq.listener.direct.consumers-per-queue=5
spring.rabbitmq.listener.direct.prefetch=5

DemoConfiguration.java

@Configuration
@Slf4j
@EnableRabbit
public class DemoConfiguration {

    @Bean
    public DirectMessageListenerContainer container(final DirectRabbitListenerContainerFactory containerFactory) {
        final DirectMessageListenerContainer listenerContainer = containerFactory.createListenerContainer();
        listenerContainer.setQueueNames("in_queue"); // has 20.000 messages before starting this application
        listenerContainer.setListenerId("listener_in_queue");
        return listenerContainer;
    }

    @Bean
    public SessionFactory<FTPFile> sessionFactory() {
        final DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
        defaultFtpSessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        defaultFtpSessionFactory.setHost("ftp.test.com");
        defaultFtpSessionFactory.setPort(1021);
        defaultFtpSessionFactory.setUsername("test");
        defaultFtpSessionFactory.setPassword("test");
        return new CachingSessionFactory<>(defaultFtpSessionFactory);
    }

    @Bean
    public RemoteFileTemplate<FTPFile> ftpFileRemoteFileTemplate(final SessionFactory<FTPFile> sessionFactory) {
        return new RemoteFileTemplate<>(sessionFactory);
    }

    @Bean
    public IntegrationFlow demoFlow(final DirectMessageListenerContainer container, final RemoteFileTemplate<FTPFile> ftpFileRemoteFileTemplate) {

        return IntegrationFlows.from(Amqp.inboundAdapter(container))
                .handle((payload, headers) -> {
                    final String file = payload.toString();
                    ftpFileRemoteFileTemplate.get(file, stream -> {
                        try {
                            log.info("{}: {}", file, IOUtils.toString(stream, StandardCharsets.UTF_8).length());
                        } finally {
                            stream.close();
                        }
                    });
                    return null;
                })
                .get();
    }

}

log file

2019-01-28 13:37:32.208  WARN 2688 --- [pool-1-thread-14                        ] org.springframework.integration.ftp.session.FtpSession                                               : failed to disconnect FTPClient

org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received.  Server closed connection.
        at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:388)
        at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:300)
        at org.apache.commons.net.ftp.FTP.getReply(FTP.java:732)
        at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1853)
        at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:108)
        at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:150)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.close(CachingSessionFactory.java:208)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:456)
        at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
        at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

2019-01-28 13:37:32.208  WARN 2688 --- [pool-1-thread-14                        ] org.springframework.integration.ftp.session.FtpSession                                               : failed to disconnect FTPClient

org.apache.commons.net.ftp.FTPConnectionClosedException: FTP response 421 received.  Server closed connection.
        at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:388)
        at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:300)
        at org.apache.commons.net.ftp.FTP.getReply(FTP.java:732)
        at org.apache.commons.net.ftp.FTPClient.completePendingCommand(FTPClient.java:1853)
        at org.springframework.integration.ftp.session.FtpSession.finalizeRaw(FtpSession.java:108)
        at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:150)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.close(CachingSessionFactory.java:208)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:456)
        at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
        at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

2019-01-28 13:37:32.212  WARN 2688 --- [pool-1-thread-14                        ] org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler                            : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:111)
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
        ... 11 common frames omitted
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:451)
        at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
        at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
        ... 29 common frames omitted
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:98)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:280)
        at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$get$4(RemoteFileTemplate.java:396)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:442)
        ... 36 common frames omitted

2019-01-28 13:37:32.215 ERROR 2688 --- [pool-1-thread-14                        ] org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer                              : Failed to invoke listener

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995)
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:111)
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237)
        at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514)
        ... 11 common frames omitted
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:451)
        at org.springframework.integration.file.remote.RemoteFileTemplate.get(RemoteFileTemplate.java:393)
        at rabbitmq.listener.DemoConfiguration.lambda$demoFlow$1(DemoConfiguration.java:58)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:102)
        ... 29 common frames omitted
Caused by: java.io.IOException: Failed to obtain InputStream for remote file /test/test_file_1245: 425
        at org.springframework.integration.ftp.session.FtpSession.readRaw(FtpSession.java:98)
        at org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession.readRaw(CachingSessionFactory.java:280)
        at org.springframework.integration.file.remote.RemoteFileTemplate.lambda$get$4(RemoteFileTemplate.java:396)
        at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:442)
        ... 36 common frames omitted

-- EDIT

I replaced CachingSessionFactory with DefaultFtpSessionFactory to avoid pool issues and got same behavior. 10 messages were in unacked state on RabbitMQ for 10 minutes. After 10 minutes, I could see that files were processed and there were no more messages on RabbitMQ. I could see that on ftp server one session was in IDLE status for 10 minutes.

    @Bean
    public SessionFactory<FTPFile> sessionFactory() {
        final DefaultFtpSessionFactory defaultFtpSessionFactory = new DefaultFtpSessionFactory();
        defaultFtpSessionFactory.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        defaultFtpSessionFactory.setHost("ftp.test.com");
        defaultFtpSessionFactory.setPort(1021);
        defaultFtpSessionFactory.setUsername("test");
        defaultFtpSessionFactory.setPassword("test");
        return defaultFtpSessionFactory;
    }

This does not seem like pool or FTP 421 problem to me, it is most likely something in the DefaultFtpSessionFactory or session itself? Why are these threads hanging for so long?

1
How does it work with the FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE?Artem Bilan
When ACTIVE_LOCAL_DATA_CONNECTION_MODE is set all threads are stuck. Ftp server log is empty.Goran Malić
en.wikipedia.org/wiki/List_of_FTP_server_return_codes 421: Service not available, closing control connection. This may be a reply to any command if the service knows it must shut down.Gary Russell

1 Answers

0
votes

The CachingSessionFactory doesn't have a limit by default:

/**
 * Create a CachingSessionFactory with the specified session limit. By default, if
 * no sessions are available in the cache, and the size limit has been reached,
 * calling threads will block until a session is available.
 * <p>
 * Do not cache a {@link DelegatingSessionFactory}, cache each delegate therein instead.
 * @see #setSessionWaitTimeout(long)
 * @see #setPoolSize(int)
 *
 * @param sessionFactory The underlying session factory.
 * @param sessionCacheSize The maximum cache size.
 */
public CachingSessionFactory(SessionFactory<F> sessionFactory, int sessionCacheSize) {

and we end up with this configuration in the underlying SimplePool:

/**
 * Creates a SimplePool with a specific limit.
 * @param poolSize The maximum number of items the pool supports.
 * @param callback A {@link PoolItemCallback} implementation called during various
 * pool operations.
 */
public SimplePool(int poolSize, PoolItemCallback<T> callback) {
    if (poolSize <= 0) {
        this.poolSize.set(Integer.MAX_VALUE);
        this.targetPoolSize.set(Integer.MAX_VALUE);
        this.permits.release(Integer.MAX_VALUE);
    }
    else {
        this.poolSize.set(poolSize);
        this.targetPoolSize.set(poolSize);
        this.permits.release(poolSize);
    }
    this.callback = callback;
}

So, consider to have a reasonable cache size for your CachingSessionFactory.