2
votes

I am reading root directory in FileReadingMessageSource from Spring Integration to retrieve ongoing file creations. Scenario is that there could be multiple sub-directories under root directory ongoing basis. WatchServiceDirectoryScanner from SI 4.3.1 is used to pick up any files created in any new sub-directory.

@Bean
public MessageSource<File> fileReadingMessageSource() {

    CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
    filters.addFilter(new SimplePatternFileListFilter("pattern*"));
    //filters.addFilter(new LastModifiedFileListFilter());

    FileReadingMessageSource fileSource = new FileReadingMessageSource();

    String filePath = "root-directory";

    fileSource.setDirectory(new File(filePath));
    fileSource.setFilter(filters);
    fileSource.setUseWatchService(true);
    fileSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE,FileReadingMessageSource.WatchEventType.MODIFY,FileReadingMessageSource.WatchEventType.DELETE);

    return fileSource;
}

@Bean
public IntegrationFlow readDirectoryFlow() {

    return IntegrationFlows.from(
            fileReadingMessageSource(), 
            e -> e.poller(Pollers.cron("*/5 * * * * *")))
            .channel(fileInputChannel())
            .handle(tailerRestart)
            .handle(System.out::println)
            .get();
}

On first poll, all files matching pattern are available via Message Resource but if any new files created later on in any new sub-directory then Message Resource is not able to pick new pattern matching files.

I see following DEBUG message in log

DEBUG SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'

What could be missing ?

1

1 Answers

2
votes

I've just wrote some test-case very close to your code:

    @Bean
    public MessageSource<File> fileReadingMessageSource() {
        CompositeFileListFilter<File> filters = new CompositeFileListFilter<>();
        filters.addFilter(new SimplePatternFileListFilter("*.watch"));

        FileReadingMessageSource fileSource = new FileReadingMessageSource();
        fileSource.setDirectory(tmpDir.getRoot());
        fileSource.setFilter(filters);
        fileSource.setUseWatchService(true);
        fileSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE,
                FileReadingMessageSource.WatchEventType.MODIFY,
                FileReadingMessageSource.WatchEventType.DELETE);
        return fileSource;
    }

    @Bean
    public IntegrationFlow readDirectoryFlow() {
        return IntegrationFlows
                .from(fileReadingMessageSource(),
                        e -> e.poller(p -> p.cron("*/1 * * * * *")))
                .handle(System.out::println)
                .get();
    }

The test code looks like:

@ClassRule
public static final TemporaryFolder tmpDir = new TemporaryFolder();

@Test
public void testWatchServiceMessageSource() throws Exception {
    File newFolder1 = tmpDir.newFolder();
    FileOutputStream file = new FileOutputStream(new File(newFolder1, "foo.watch"));
    file.write(("foo").getBytes());
    file.flush();
    file.close();

    File newFolder2 = tmpDir.newFolder();
    file = new FileOutputStream(new File(newFolder2, "bar.watch"));
    file.write(("bar").getBytes());
    file.flush();
    file.close();

    file = new FileOutputStream(new File(tmpDir.getRoot(), "root.watch"));
    file.write(("root").getBytes());
    file.flush();
    file.close();

    Thread.sleep(10000);
}

And I have this logs:

GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit7776799219532481336\foo.watch, headers={id=50d44197-e0af-708a-6b61-2a2cfeec68da, timestamp=1473686655061}]
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit813088196038861528\bar.watch, headers={id=8d80c853-19b6-f667-7950-d6de49d509ab, timestamp=1473686656062}]
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\root.watch, headers={id=e585203b-41dc-cadb-6a36-4c9009a34701, timestamp=1473686657063}]

each second.

Not sure where is your problem...

You don't need .channel(fileInputChannel()). It will be created automatically in between ednpoints.

with the config:

.handle(tailerRestart)
.handle(System.out::println)

you should be sure that tailerRestart returns something. Although, according to our other discussion, it doesn't:

@ServiceActivator
public void restartTailer(File input) throws Exception {
    tailFileProducer.stop();
    tailFileProducer.setFile(input);
    tailFileProducer.start();
}

UPDATE

After some private investigation we figured out that the issue is with the FileReadingMessageSource.start() called several times by the Spring Cloud Stream infrastructure, causing re-instantiating internal WatchService object.

The FileReadingMessageSource.start() has to be fixed to be idempotent: https://jira.spring.io/browse/INT-4108

The Spring Cloud Stream has been fixed in version 1.1: https://github.com/spring-cloud/spring-cloud-stream/issues/525.

The workaround is like ensure that FileReadingMessageSource.start() is called only once:

FileReadingMessageSource fileSource = new FileReadingMessageSource() {

   private final AtomicBoolean running = new AtomicBoolean();

   @Override
   public void start() {
      if (!this.running.getAndSet(true)) {
         super.start();
      }
   }

   @Override
   public void stop() {
      if (this.running.getAndSet(false)) {
         super.stop();
      }
   }

};