I need to poll FTP server and process new or changed files. I use Spring Integration 5.3.2 with inbound FTP adapter and poller with fixed rate of 5 seconds. All files downloaded immediately in local directory, but integration flow underlying handlers invokes after every 5 seconds for each file. I want to process downloaded list of files immediately in concurent threads, but poll ftp each 5 seconds after flow ends. How can i do it?
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.maxFetchSize(ftpProperties.maxFetchSize)
.remoteDirectory(ftpProperties.remoteDirectory)
.localDirectory(File(ftpProperties.downloadDirectory))
.filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(), "ftp-"))
.regexFilter(".*\\.zip$")
) { e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(5))) }
.channel(MessageChannels.executor(Executors.newWorkStealingPool()))
.transform(unZipTransformer())
.handle { m -> LOGGER.info("Unzipped {}", m.headers[FileHeaders.FILENAME]) }
.get()
}