0
votes

I have two filters regexFilter and lastModified.

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .regexFilter(config.getRegexFilter())
            .filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory())
            , e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {

    })))

By googling I understand I have to use CompositeFileListFilter for regex so change my code to

.filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))

Its compiled but on run time throws error and channel stooped and same error goes for

.filter(ftpPersistantFilter(config.getRegexFilter()))
.
.
.

public CompositeFileListFilter ftpPersistantFilter(String regexFilter) {
        CompositeFileListFilter filters = new CompositeFileListFilter();
            filters.addFilter(new FtpRegexPatternFileListFilter(regexFilter));
        return filters;
    }

I just want to filter on the basis of file name. There are 2 flows for same remote folder and both are polling with same cron but should pick their relevant file.

EDIT adding last LastModifiedLsEntryFileListFilter. Its working fine but adding upon request.

public class LastModifiedLsEntryFileListFilter implements FileListFilter<LsEntry> {

private final Logger log = LoggerFactory.getLogger(LastModifiedLsEntryFileListFilter.class);
private static final long DEFAULT_AGE = 60;

private volatile long age = DEFAULT_AGE;

private volatile Map<String, Long> sizeMap = new HashMap<String, Long>();


public long getAge() {
    return this.age;
}

public void setAge(long age) {
    setAge(age, TimeUnit.SECONDS);
}

public void setAge(long age, TimeUnit unit) {
    this.age = unit.toSeconds(age);
}

@Override
public List<LsEntry> filterFiles(LsEntry[] files) {

    List<LsEntry> list = new ArrayList<LsEntry>();

    long now = System.currentTimeMillis() / 1000;

    for (LsEntry file : files) {

        if (file.getAttrs()
                .isDir()) {
            continue;
        }
        String fileName = file.getFilename();
        Long currentSize = file.getAttrs().getSize();
        Long oldSize = sizeMap.get(fileName);

        if(oldSize == null || currentSize.longValue() != oldSize.longValue() ) {
            // putting size in map, will verify in next iteration of scheduler
            sizeMap.put(fileName, currentSize);
            log.info("[{}] old size [{}]  increased to [{}]...", file.getFilename(), oldSize, currentSize);
            continue;
        }

        int lastModifiedTime = file.getAttrs()
            .getMTime();

        if (lastModifiedTime + this.age <= now ) {
            list.add(file);
            sizeMap.remove(fileName);
        } else {
            log.info("File [{}] is still being uploaded...", file.getFilename());
        }
    }
    return list;
}

}

PS : When I am testing filter for regex I have removed LastModifiedLsEntryFileListFilter just for simplicity. So my final Flow is

return IntegrationFlows.from(Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autoCreateLocalDirectory(true)
            .filter(new CompositeFileListFilter().addFilter(new RegexPatternFileListFilter(config.getRegexFilter())))
            //.filter(new LastModifiedLsEntryFileListFilter())
            .remoteDirectory(config.getInboundDirectory()),
            e -> e.poller(Pollers.fixedDelay(60_000)
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
                try {

                    this.destroy(String.valueOf(config.getId()));


    configurationService.removeConfigurationChannelById(config.getId());

//                // logging here
                } catch (Exception ex1) {
            }
            }))).publishSubscribeChannel(s -> s
            .subscribe(f -> {

                f.handle(Sftp.outboundAdapter(outboundSftp)
                        .useTemporaryFileName(false)
                        .autoCreateDirectory(true)
                        .remoteDirectory(config.getOutboundDirectory()), c -> c.advice(startup.deleteFileAdvice()));

            })
            .subscribe(f -> {
                if (doArchive) {
                    f.handle(Sftp.outboundAdapter(inboundSftp)
                            .useTemporaryFileName(false)
                            .autoCreateDirectory(true)
                            .remoteDirectory(config.getInboundArchiveDirectory()));
                } else {
                    f.handle(m -> {
                    });
                }

            })
            .subscribe(f -> f
            .handle(m -> {

                // I am handling exception here
            })
            ))
            .get();

and here are exceptions

2020-01-27 21:36:55,731 INFO o.s.i.c.PublishSubscribeChannel - Channel 

'application.2.subFlow#0.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
2020-01-27 21:36:55,731 INFO o.s.i.c.DirectChannel - Channel 'application.2.subFlow#2.channel#0' has 0 subscriber(s).
2020-01-27 21:36:55,731 INFO o.s.i.e.EventDrivenConsumer - stopped 2.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#1

EDIT After passing regex to LastModifiedLsEntryFileListFilter and handle there works for me. When I use any other RegexFilter inside CompositeFileListFilter it thorws error.

.filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))
1

1 Answers

1
votes

Show, please, your final flow. I don't see that you use LastModifiedLsEntryFileListFilter in your CompositeFileListFilter... You definitely can't use regexFilter() and filter() together - the last one wins. To avoid confusion we suggest to use a filter() and compose all those with CompositeFileListFilter or ChainFileListFilter.

Also what is an error you are mentioning, please.