I am using spring integration to pull in different account details from external systems concurrently (using taskmanager) and aggregate it into one account object. For that, I will have to do two levels of splits of message and after fetching need to aggregate at two levels. I am using simple pojos for splitter, router and aggregator implementations. And I am implementing correlation strategy and using a simple message group but not implementing any release strategy. For some reason, my messages are getting lost at the second aggregator. When I debug, I can see the messages coming on to the second level aggregator from first level aggregators, but it is unable to pass those messages on to the output channel.
One more observation is that, if there is only one message, I am getting the output. But of any conditions which result in more than one message to get aggregated, I dont see any output and the threads are hung.
Any help appreciated!!
Here is the context definitions.
<bean id="accountManager" class="<package>.AccountManager"/>
<int:gateway id="accountBuilder"
service-interface="<package>.AccountBuilder" default-request-channel="accountRequest" default-reply-channel="allAccounts"/>
<int:channel id="accountRequest"/>
<int:channel id="allAccounts"/>
<int:splitter input-channel="accountRequest" output-channel="accountRequests" ref="accountSplitter" method="split"/>
<int:channel id="accountRequests">
<int:dispatcher task-executor="accountServiceTaskExecutor"/>
</int:channel>
<int:router input-channel="accountRequests" ref="accountRouter" method="routeAccountRequests">
<int:mapping channel="retailRequest"/>
<int:mapping channel="manualRequest"/>
</int:router>
<bean id="accountMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="searchResultMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="accountMessageStore" />
<property name="timeout" value="2000" />
</bean>
<!-- **************************************************************************** -->
<!-- **************************** RETAIL ACCOUNTS ******************************* -->
<!-- **************************************************************************** -->
<int:channel id="retailRequest"/>
<int:splitter input-channel="retailRequest" output-channel="retailAccountRequests" ref="retailAccountSplitter" method="split"/>
<int:channel id="retailAccountRequests">
<int:dispatcher task-executor="accountServiceTaskExecutor"/>
</int:channel>
<int:router input-channel="retailAccountRequests" ref="retailAccountRouter" method="routeRetailAccountRequests">
<int:mapping channel="retailAccountDataRequest"/>
<int:mapping channel="retailAccountDetailsRequest"/>
<int:mapping channel="retailAccountPositionsRequest"/>
</int:router>
<int:channel id="retailAccountDataRequest"/>
<int:service-activator input-channel="retailAccountDataRequest" ref="retailAccountDataMapper"
method="getRetailAccountData" output-channel="aggregatedRetailAccounts"/>
<int:channel id="retailAccountDetailsRequest"/>
<int:service-activator input-channel="retailAccountDetailsRequest" ref="retailAccountDetailsMapper"
method="getRetailAccountDetails" output-channel="aggregatedRetailAccounts"/>
<int:channel id="retailAccountPositionsRequest"/>
<int:service-activator input-channel="retailAccountPositionsRequest" ref="retailAccountPositionsMapper"
method="getRetailAccountPositions" output-channel="aggregatedRetailAccounts"/>
<int:channel id="aggregatedRetailAccounts" />
<int:aggregator input-channel="aggregatedRetailAccounts"
ref="retailAccountAggregator" method="aggregate" output-channel="aggregatedAccounts"
message-store="accountMessageStore" expire-groups-upon-completion="true"/>
<!-- ************************** END RETAIL ACCOUNTS ***************************** -->
<!-- **************************************************************************** -->
<!-- **************************** MANUAL ACCOUNTS ******************************* -->
<!-- **************************************************************************** -->
<int:channel id="manualRequest"/>
<int:splitter input-channel="manualRequest" output-channel="manualAccountRequests" ref="manualAccountSplitter" method="split"/>
<int:channel id="manualAccountRequests">
<int:dispatcher task-executor="accountServiceTaskExecutor"/>
</int:channel>
<int:router input-channel="manualAccountRequests" ref="manualAccountRouter" method="routeManualAccountRequests">
<int:mapping channel="manualAccountDataRequest"/>
<int:mapping channel="manualAccountDetailsRequest"/>
<int:mapping channel="manualAccountPositionsRequest"/>
</int:router>
<int:channel id="manualAccountDataRequest"/>
<int:service-activator input-channel="manualAccountDataRequest" ref="manualAccountDataMapper"
method="getManualAccountData" output-channel="aggregatedManualAccounts"/>
<int:channel id="manualAccountDetailsRequest"/>
<int:service-activator input-channel="manualAccountDetailsRequest" ref="manualAccountDetailsMapper"
method="getManualAccountDetails" output-channel="aggregatedManualAccounts"/>
<int:channel id="manualAccountPositionsRequest"/>
<int:service-activator input-channel="manualAccountPositionsRequest" ref="manualAccountPositionsMapper"
method="getManualAccountPositions" output-channel="aggregatedManualAccounts"/>
<int:channel id="aggregatedManualAccounts"/>
<int:aggregator input-channel="aggregatedManualAccounts"
ref="manualAccountAggregator" method="aggregate" output-channel="aggregatedAccounts"
message-store="accountMessageStore" expire-groups-upon-completion="true"/>
<!-- ************************** END MANUAL ACCOUNTS ***************************** -->
<int:channel id="aggregatedAccounts" />
<int:aggregator input-channel="aggregatedAccounts" ref="accountAggregator"
method="aggregate" output-channel="allAccounts" message-store="accountMessageStore"
expire-groups-upon-completion="true" />
<bean id="accountServiceTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="25" />
<property name="maxPoolSize" value="250" />
<property name="queueCapacity" value="500" />
</bean>