0
votes

I need to poll FTP server and process new or changed files. I use Spring Integration 5.3.2 with inbound FTP adapter and poller with fixed rate of 5 seconds. All files downloaded immediately in local directory, but integration flow underlying handlers invokes after every 5 seconds for each file. I want to process downloaded list of files immediately in concurent threads, but poll ftp each 5 seconds after flow ends. How can i do it?

@Bean
fun ftpInboundFlow(): IntegrationFlow {
    return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
            .preserveTimestamp(true)
            .maxFetchSize(ftpProperties.maxFetchSize)
            .remoteDirectory(ftpProperties.remoteDirectory)
            .localDirectory(File(ftpProperties.downloadDirectory))
            .filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(), "ftp-"))
            .regexFilter(".*\\.zip$")
    ) { e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(5))) }
            .channel(MessageChannels.executor(Executors.newWorkStealingPool()))
            .transform(unZipTransformer())
            .handle { m -> LOGGER.info("Unzipped {}", m.headers[FileHeaders.FILENAME]) }
            .get()
}
2

2 Answers

0
votes

set maxMessagesPerPoll - it defaults to 1; -1 means infinity.

Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)
0
votes

Sounds more like you want to use an MGET command from the FtpOutboundGateway. So, you still can have a plain poller with empty string payload to send, something like IntegrationFlows.from(() -> "") with a desired poller options.

The calling an Ftp.outboundGateway() with an MGET command against a remote dir will give you a List<File> as a reply message. So, you are free to do anything with your files as a batch.

See docs for more info: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway