1
votes

I am trying to understand at a high level how the thread allocation happens within the spring integration flow especially when an aggreagtor is used. To provide some context my flow looks something like this

poll files from a directory
use an executor channel to received polled files
aggregate polled files until some criteria is met
release aggregated files to a subsequent channel which processes all aggregated files
split files
archive individual files

the main goal in this configuration is the detach the polling process from the subsequent steps in the flow. I want to be able to continue to poll and load files while the existing files are being processed. All that works fine with using an executor channel but once the thread pool limit is reached and if lets say the poller thread ends up getting stuck because of CALLER_RUNS rejection policy then I have to wait until this thread is completed before it can return to polling more files.

Okay so going back to my real question which is to understand how threads are allocated in such a flow. So poller retieves a file, passes it to the executor channel which uses a dispatcher and therefore creates a new thread to handle the incoming file. Now once this file gets stuck in the aggregator what happens to this thread. Is the thread completed/closed because the file has reached its current final destination.

And then once the aggregator limit is reached and all files are emitted to the next channel I am assuming that happens in a new/single thread but then when we use a splitter it then creates a new thread for every split file. Is that correct.

I am sorry if this is a bit confusing. I am really just understand the process a little bit better because what I dont like about this setup (my configuration) is that almost every time the flow gets stuck at the step after the aggregator which is the time consuming step since its processing all aggregated files .. most of the its the CALLER_RUNS that ends up executing the time consuming step .. which then essentially stops the polling process until this time consuming step is done.

I will also post my configuration xml but basically I am hoping that with better understanding of the thread allocation process I might be able to tune this a little more and not make the aggregated files processing step a bottleneck.

<!-- Poll files from landing zone directory -->
<int-file:inbound-channel-adapter id="files" directory="${lz.dir.${ft}}" filename-regex=".*\.txt$">
    <int:poller fixed-delay="3000" max-messages-per-poll="10"  />
</int-file:inbound-channel-adapter>

<int:bridge input-channel="files" output-channel="sourceFiles" />

<!-- Dispatch retrieved files -->
<int:channel id="sourceFiles">
    <int:dispatcher task-executor="executor" />
</int:channel>

<!-- Move files to source directory -->
<int:service-activator  input-channel="sourceFiles"
                        output-channel="sourceFilesProcessed" 
                        ref="moveToSource" 
                        method="move" />

<int:channel id="sourceFilesProcessed" />

<!-- Aggregate at source -->
<int:aggregator id="filesBuffered" 
                input-channel="sourceFilesProcessed"
                output-channel="stagedFiles"
                release-strategy-expression="size() >= 500 and [email protected]()" 
                correlation-strategy-expression="'mes-group'" 
                expire-groups-upon-completion="true" 
                />

<int:channel id="stagedFiles" />

<!-- Process aggregated source files -->
<int:chain input-channel="stagedFiles">
    <!-- Move stage files to target -->
    <int:service-activator  ref="moveToStageAndTarget" method="move" />

    <!-- Split files back into individual file -->
    <int:splitter />

    <!-- Archice each file -->
    <int:service-activator ref="archiveFiles" method="archive" />           
</int:chain>    

<task:executor id="executor" pool-size="100" queue-capacity="0" rejection-policy="CALLER_RUNS" />
1

1 Answers

1
votes
  1. It's bad to use queue-capacity="0". Take a look, please, to the JavaDocs of ThreadPoolTaskExecutor. So, your sticking is really around SynchronousQueue

  2. It is recommended to use ExecutorChannel as an output for <splitter>, as far as it produces several Messages, In your case now they all are processed within the same Thread one by one.

  3. Aggregator does not depend of thread, but there is need to keep in mind that it release a group within that Thread, whose Message says to release-strategy true. So, it is an another case why you get sticking.