1
votes

I'm following the docs: Spring Batch Integration combining with the Integration AWS for pooling the AWS S3.

But the batch execution per each file is not working in some situations.

The AWS S3 Pooling is working correctly, so when I put a new file or when I started the application and there's files in the bucket the application sync with the local directory:

    @Bean
    public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
        return new S3SessionFactory(pAmazonS3);
    }

    @Bean
    public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
        S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(pS3SessionFactory);
        synchronizer.setPreserveTimestamp(true);
        synchronizer.setDeleteRemoteFiles(false);
        synchronizer.setRemoteDirectory("remote-bucket");
        //synchronizer.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "simpleMetadataStore"));
        return synchronizer;
    }

    @Bean
    @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
    public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
            S3InboundFileSynchronizer pS3InboundFileSynchronizer) {
        S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
        messageSource.setAutoCreateLocalDirectory(true);
        messageSource.setLocalDirectory(new FileSystemResource("files").getFile());
        //messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "fsSimpleMetadataStore"));
        return messageSource;
    }

    @Bean("s3filesChannel")
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

I followed the tutorial so I created the FileMessageToJobRequest I won't put the code here because it's the same as the docs

So I created the beans IntegrationFlow and FileMessageToJobRequest:

    @Bean
    public IntegrationFlow integrationFlow(
            S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource) {
        return IntegrationFlows.from(pS3InboundFileSynchronizingMessageSource, 
                         c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
                .transform(fileMessageToJobRequest())
                .handle(jobLaunchingGateway())
                .log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
                .get();
    }

    @Bean
    public FileMessageToJobRequest fileMessageToJobRequest() {
        FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
        fileMessageToJobRequest.setFileParameterName("input.file.name");
        fileMessageToJobRequest.setJob(delimitedFileJob);
        return fileMessageToJobRequest;
    }

So in the JobLaunchingGateway I think is the problem:

If I created like this:

    @Bean
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

        return jobLaunchingGateway;
    }

Case 1 (Bucket is empty when the application starts):

  • I upload a new file in the AWS S3;
  • The pooling works and the file appears in the local directory;
  • But the transform/job isn't fired;

Case 2 (Bucket already has one file when application starts):

  • The job is launched:
2021-01-12 13:32:34.451  INFO 1955 --- [ask-scheduler-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=arquivoDelimitadoJob]] launched with the following parameters: [{input.file.name=files/FILE1.csv}]
2021-01-12 13:32:34.524  INFO 1955 --- [ask-scheduler-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [delimitedFileJob]
  • If I add a second file in S3, the job isn't launched as the case 1.

Case 3 (Bucket has more than one file):

  • The files are synchronized correctly in local directory
  • But the job is only executed once for the last file.

So following the docs I change my Gateway to:

    @Bean
    @ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedRate="1000"))
    public JobLaunchingGateway jobLaunchingGateway() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository);
        simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());

        //JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
        JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
        //jobLaunchingGateway.setOutputChannel(replyChannel());
        jobLaunchingGateway.setOutputChannel(s3FilesChannel());
        return jobLaunchingGateway;
    }

With this new gateway implementation, if I put a new file in S3 the application reacts but didn't transform giving the error:

Caused by: java.lang.IllegalArgumentException: The payload must be of type JobLaunchRequest. Object of class [java.io.File] must be an instance of class org.springframework.batch.integration.launch.JobLaunchRequest

And if there's two files in the bucket (when the apps starts) FILE1.csv and FILE2.csv, the job runs for the FILE1.csv correctly, but give the error above for the FILE2.csv.

What's the correct way to implement something like this?

Just to be clear I want to receive thousand of csv files in this bucket, read and process with Spring Batch, but I also need to get every new file asap from S3.

Thanks in advance.

1

1 Answers

1
votes

The JobLaunchingGateway indeed expects from us only JobLaunchRequest as a payload.

Since you have that @InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30")) on the S3InboundFileSynchronizingMessageSource bean definition, it is really wrong to have then @ServiceActivator(inputChannel = IN_CHANNEL_NAME for that JobLaunchingGateway without FileMessageToJobRequest transformer in between.

Your integrationFlow looks OK for me, but then you really need to remove that @InboundChannelAdapter from the S3InboundFileSynchronizingMessageSource bean and fully rely on the c.poller() configuration.

Another way is to leave that @InboundChannelAdapter, but then start the IntegrationFlow from the IN_CHANNEL_NAME not a MessageSource.

Since you have several poller against the same S3 source, plus both of then are based on the same local directory, it is not a surprise to see so many unexpected situations.