3
votes

I use "int-file: inbound-channel-adapter" to load files which exist into a directory. And i like to process files sequentially: it means that when the processing of the first file is completed, I load the second file ...etc.

I see a sample but I can't forcast the needed period to processing one file which depends on its size.

My source code:

    <int-file:inbound-channel-adapter
    directory="${directory.files.local}" id="filesIn" channel="channel.filesIn">
    <int:poller fixed-delay="1000" max-messages-per-poll="1" />

</int-file:inbound-channel-adapter>

the process of a file is file:inbound-channel-adapter--->transformer--->splitter---->http:outbound-gateway--->outbound-mail-adapter---->the processing of a file is finished, so at this time, i the next file to be processed.

My project's configuration is too complicated. Below, i show you more of the configuration: the first part of configuration is :

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
    auto-startup="true" channel="receiveChannel" session-factory="sftpSessionFactory"
    local-directory="file:${directory.files.local}" remote-directory="${directory.files.remote}"
    auto-create-local-directory="true" delete-remote-files="true"
    filename-regex=".*\.txt$">
    <int:poller fixed-delay="${sftp.interval.request}"
        max-messages-per-poll="-1" />
</int-sftp:inbound-channel-adapter>
<!-- <int:poller cron="0 * 17 * * ?"></int:poller> -->

<int-file:inbound-channel-adapter
    filter="compositeFileFilter" directory="${directory.files.local}" id="filesIn"
    channel="channel.filesIn" prevent-duplicates="true">
    <int:poller fixed-delay="1000" max-messages-per-poll="1" />
</int-file:inbound-channel-adapter>

<int:transformer input-channel="channel.filesIn"
    output-channel="channel.file.router" ref="fileTransformer" method="transform" />

<int:recipient-list-router id="fileRouter"
    input-channel="channel.file.router">

    <int:recipient channel="channel.empty.files"
        selector-expression="payload.length()==0" />
    <int:recipient channel="channel.filesRejected"
        selector-expression="payload.toString().contains('rejected')" />
    <int:recipient channel="toSplitter"
        selector-expression="(payload.length()>0) and(!payload.toString().contains('rejected'))" />

</int:recipient-list-router>

and then from the channel tosplitter, my program read a file line by line:

    <int-file:splitter input-channel="toSplitter"
    output-channel="router" requires-reply="false" />

<int:recipient-list-router id="recipentRouter"
    input-channel="router">

    <int:recipient channel="channelA"
        selector-expression="headers['file_name'].startsWith('${filenameA.prefix}')" />

    <int:recipient channel="channelB"
        selector-expression="headers['file_name'].startsWith('${filenameB.prefix}')" />

</int:recipient-list-router>

Each channel A and B should call two diffrent WS for each line. each file use asynch call to ws code is below for file A:

<int:header-enricher input-channel="channelA"
    output-channel="channelA.withHeader">
    <int:header name="content-type" value="application/json" />
    <int:header name="key1" expression="payload.split('${line.column.separator}')[0]" />
    <int:header name="key2" expression="payload"></int:header>
</int:header-enricher>

<int:transformer input-channel="channelA.withHeader"
    output-channel="channelA.request" ref="imsiMsgTransformer"
    method="transform">
</int:transformer>


<int:channel id="channelA.request">
    <int:queue capacity="10" />

<int-http:outbound-gateway id="maspUpdatorSimChangedGateway"
    request-channel="channelA.request" 
    url="${url}"
    http-method="PUT" expected-response-type="java.lang.String"       charset="UTF-8"
    reply-timeout="${ws.reply.timeout}" reply-channel="channelA.reply">
    <int-http:uri-variable name="foo" expression="headers['key1']" />
    <int:poller fixed-delay="1000" error-channel="channelA.error"
        task-executor="executorA" />

        <int-http:request-handler-advice-chain>
        <int:retry-advice max-attempts="${ws.max.attempts}"
            recovery-channel="recovery.channelA">
            <int:fixed-back-off interval="${ws.interval.attempts}" />
        </int:retry-advice>
    </int-http:request-handler-advice-chain>

</int-http:outbound-gateway>

<int:service-activator input-channel="recovery.channelA"
    ref="updateImsiHttpResponseErrorHandler" method="handleMessage" output-channel="updateImsi.channel.error.toenricher">
</int:service-activator>

<int:service-activator input-channel="channelA.reply"
    ref="updateImsiHttpResponseMessageHandler" method="handleMessage">
    <int:poller fixed-delay="1000"></int:poller>
</int:service-activator>

In each activator of(reply channel and recovery channel), i count the progression of a file until the file is completed at this point i should load the second File A2 or File B ...etc

1

1 Answers

1
votes

That is the default behavior as long as

  1. The poller does not have a task-executor (which yours doesn't).
  2. Only DirectChannels (the default) are used downstream of the adapter - this means no QueueChannels or ExecutorChannel (i.e. no task-executor or <queue/> on the channels).

In that scenario, the next poll is not even considered until the current one finishes - the flow runs on the poller thread and only one poll can be in process at once.

The fixed-delay does not start until the current file is completely processed.

EDIT

If you need to use async processing on the flow, you need to use a Conditional Poller or a simple PollSkipAdvice.

You would provide a PollSkipStrategy implementation that would return false until the file is complete.

That way, subsequent polls will be skipped until you decide.

EDIT2

Something like this...

/*
 * Copyright 2015 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.integration.scheduling;

/**
 * @author Gary Russell
 * @since 4.3
 *
 */
public class SimplePollSkipStrategy implements PollSkipStrategy {

    private volatile boolean skip;

    @Override
    public boolean skipPoll() {
        return this.skip;
    }

    public void skipPolls() {
        this.skip = true;
    }

    public void reset() {
        this.skip = false;
    }
}
  • Add it as a <bean/> to your context.
  • Add it to the poller's advice chain with a PollSkipAdvice
  • When you want to skip polls, call skipPolls().
  • When you've finished with a file, call reset().