What is the best solution to aggregate one message from sftp inbound message source that contains multiple files? We have on remote machine 3 files that need to be received. After that we combine content of those files to one json message and send it forward.
public IntegrationFlow sftpIntegrationFlowBean() {
final Map<String, Object> headers = new HashMap<>();
headers.put("sftpFile", "sftpFile");
final Consumer<AggregatorSpec> aggregator = t -> {
t.sendPartialResultOnExpiry(true);
t.expireGroupsUponCompletion(true);
t.processor(new CustomMessageAggregator());
};
return IntegrationFlows
.from(sftpInboundMessageSource(),
e -> e.id("sftpIntegrationFlow").poller(pollerMetadataSftp))
.enrichHeaders(headers).aggregate(aggregator)
.handle(customMessageSender).get();
}
Poller polls every 15 minutes. While running this code next thing happens:
- Retrieve files and process one of them
- After 15 minutes second file is processed
- After another 15 minutes third file is processed
- And finally after more 15 minutes message is sent to destination
How can this all be done in one operation without delays? I did try this with FileReadingMessageSource, but had a same result.
Thank you in advance.