In my spring-integration-flow:
I have a Inbound-jdbc-adapter as an entry point to the system and I have channel's, router's and service-activator's and aggregator. So I do not want to feed more messages to the system if there are too many messages being processed.
How should I do this. I know this is a very vague question but your insights will be very helpful. Can I use inbound-jdbc-adapter and then Gateway in the flow.
Currently: I am setting the max-rows-per-poll value depending on the Channel's remainingCapacity which is associated to the inbound-channel-adapter but this message would be instantly moved to a different channel but will still be in the system and this defeats the purpose.
Earlier: I used to have a Gateway where a webservice call from a different system would send a message on the Gateway and returns. I had to take this out as it was holding the response for in-definite time if that message was stuck in one of the service-activator due to some issue and also I had not specified the time-out value.
Update: Adding the Spring-Integration configuration. This is a very long file about 500 lines. I have added the description of each element. Let me know if this is not understandable.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:si="http://www.springframework.org/schema/integration"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:si-jdbc="http://www.springframework.org/schema/integration/jdbc"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc-2.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<!-- This channel is linked to the inbound-adapter -->
<si:channel id="orderQueueChannel">
<si:queue capacity="200"/>
</si:channel>
<bean id="mapperUtil" class="com.foo.service.process.mapper.common.MapperUtil"></bean>
<bean id="rowMapperForHighPriority" class="com.foo.service.process.mapper.HighPriorityResultStatusMapper"></bean>
<bean id="inboundAdapterPollingConfiguration" class="com.foo.service.download.retriever.impl.InboundAdapterPollingConfigurationImpl">
<property name="channel" ref="orderQueueChannel"/>
<property name="jdbcInboundAdapter" ref="jdbcInboundAdapter"/>
</bean>
<!-- JDBC-INBOUND-ADAPTER: This pushes messages to the channel -->
<si-jdbc:inbound-channel-adapter id="jdbcInboundAdapter"
channel="orderQueueChannel" data-source="myDataSource"
auto-startup="true"
query="SELECT * FROM STAGE_TABLE WHERE MESSAGE_STATUS NOT IN ('IN_PROGRESS','SUCCESS','FAILURE') and NODE_NAME='${weblogic.Name}' ORDER BY STAGE_ID ASC"
update="UPDATE STAGE_TABLE SET MESSAGE_STATUS='IN_PROGRESS' WHERE STAGE_ID IN (:stageId)"
max-rows-per-poll="100" row-mapper="rowMapperForHighPriority"
update-per-row="true">
<si:poller fixed-rate="2000">
<si:advice-chain>
<ref bean="txAdvice"/>
<ref bean="inboundAdapterPollingConfiguration"/>
</si:advice-chain>
</si:poller>
</si-jdbc:inbound-channel-adapter>
<tx:advice id="txAdvice">
<tx:attributes>
<tx:method name="get*" read-only="false"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
<!--This splits the Payload of List to individual payload -->
<si:splitter input-channel="orderQueueChannel" output-channel="processSplitOrderStatusHighChannel" />
<si:channel id="processSplitOrderStatusHighChannel">
<si:queue capacity="200"/>
<si:interceptors>
<si:wire-tap channel="logger"/>
</si:interceptors>
</si:channel>
<!--Here we get 2 types of messages STATUS, RESULT and RESULT_SET this ROUTER would route messages depending on the message type -->
<bean id="messageTypeHighRouter" class="com.foo.service.process.router.MessageTypeHighRouter"/>
<si:router input-channel="processSplitOrderStatusHighChannel" ref="messageTypeHighRouter" method ="routeMessage"/>
<!-- STATUS MESSAGES GO TO THIS CHANNEL -->
<si:channel id="statusHighChannel">
<si:queue capacity="50"/>
<si:interceptors>
<si:wire-tap channel="logger"/>
</si:interceptors>
</si:channel>
<!-- SERVICE ACTIVATOR TO PROCESS THE STATUS-MESSAGES -->
<si:service-activator input-channel="statusHighChannel" ref="statusConsumeService" method="consumeStatus" output-channel="ackHighChannel">
<si:poller task-executor="statusMessageHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="statusMessageHighTaskExecutor" pool-size="2-15" queue-capacity="5" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- ACKNOWLEDGE CHANNEL __>
<si:channel id="ackHighChannel"/>
<!-- Processing Results -->
<!-- Have a Router to Route results according to OrderType on OrderTypeHandler Channels
and on each channel have a service activator to process the results accordingly -->
<!-- BELOW ARE THE CHANNEL DEFINITIONS FOR RESULT AND RESULT_SET -->
<si:channel id="highSimpleOrderResultChannel">
<si:queue capacity="50"/>
<si:interceptors>
<si:wire-tap channel="logger"/>
</si:interceptors>
</si:channel>
<si:channel id="resultHighChannel">
<si:queue capacity="50"/>
<si:interceptors>
<si:wire-tap channel="logger"/>
</si:interceptors>
</si:channel>
<si:channel id="resultOrderSetHighChannel">
<si:queue capacity="50"/>
<si:interceptors>
<si:wire-tap channel="logger"/>
</si:interceptors>
</si:channel>
<si:channel id="resultOrderSetSplitHighChannel">
<si:queue capacity="50"/>
<si:interceptors>
<si:wire-tap channel="logger"/>
</si:interceptors>
</si:channel>
<!--This service-activator would get a RESULT_SET Object which would convert it to List<RESULT> and returns it -->
<si:service-activator input-channel="resultOrderSetHighChannel"
output-channel="resultOrderSetSplitHighChannel" ref="resultOrderSetSplitter"
method="splitOrderSetResultToOrderResult">
<si:poller task-executor="resultMessageSetHighTaskExecutor"
fixed-delay="1000" />
</si:service-activator>
<task:executor id="resultMessageSetHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!--This bean converts RESULT Object to List<RESULT> object and the list size is 1 -->
<bean id="simpleOrderResultMapper" class="com.foo.service.process.transformer.SimpleMissionResultTransformer"/>
<si:transformer id="simpleOrderResultHighTransformer" input-channel="highSimpleOrderResultChannel" method="transform" output-channel="resultOrderSetSplitHighChannel" ref="simpleOrderResultMapper"/>
<bean id="splitterBean" class="com.foo.service.process.impl.MessageSplitter" />
<!-- SPLITTER : LIST<RESULT> to RESULT -->
<si:chain input-channel="resultOrderSetSplitHighChannel" output-channel="resultHighChannel">
<si:splitter ref="splitterBean"/>
</si:chain>
<!-- This service would update the OrderTypeCategory attribute in the payload -->
<si:service-activator input-channel="resultHighChannel"
output-channel="processResultHighChannel" ref="resultAssignCategoryService"
method="processCategoryResults">
<si:poller task-executor="resultMessageHighTaskExecutor"
fixed-delay="1000" />
</si:service-activator>
<task:executor id="resultMessageHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- This router uses the OrderTypeCategory attribute to route to the below channels -->
<si:router input-channel="processResultHighChannel" ref="orderTypeCategoryRouter" method ="routeHighMessage"/>
<!-- ORDER-TYPE-CAATEGORY Channels -->
<si:channel id="ORDERHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:channel id="NOTIFICATION_SOFTWARE_UPDATEHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:channel id="NOTIFICATION_USERHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:channel id="ALERTHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:channel id="FILE_WATCHERHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:channel id="ERRORHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:transformer id="highTransformer" input-channel="ERRORHighChannel" method="transform" output-channel="ackHighChannel" ref="OrderResultsMessageTransformer"/>
<!-- This service-activator would update the OrderType attribute in the payload-->
<si:service-activator input-channel="ORDERHighChannel" output-channel="processORDERHighChannel" ref="resultAssignCategoryService" method="processOrderResults" >
<si:poller task-executor="resultOrderHighTaskExecutor" fixed-delay="1000" />
</si:service-activator>
<task:executor id="resultOrderHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- This router would use the OrderType Attribute to route to different channels and the service-activators on the channels will process the messages-->
<si:router input-channel="processORDERHighChannel" ref ="orderTypeRouter" method ="routeHighMessage"/>
<!-- Below are the list of channels and service-activators which process the messages when they reach their respective channels-->
<!-- Get_Properties Handler -->
<si:channel id="GET_PROPERTIESHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="GET_PROPERTIESHighChannel" ref="getPropertiesHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="getPropertiesHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="getPropertiesHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- Set_Properties Handler -->
<si:channel id="SET_PROPERTIESHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="SET_PROPERTIESHighChannel" ref="updateResultStatusHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="setPropertiesHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="setPropertiesHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- ADD_AGENT_PROPERTIES Handler -->
<si:channel id="ADD_AGENT_PROPERTIESHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="ADD_AGENT_PROPERTIESHighChannel" ref="updateResultStatusHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="addAgentPropertiesHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="addAgentPropertiesHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- FILE_DOWNLOAD_FROM_ENTERPRISE Handler -->
<si:channel id="FILE_DOWNLOAD_FROM_ENTERPRISEHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="FILE_DOWNLOAD_FROM_ENTERPRISEHighChannel" ref="updateResultStatusHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="fileDownloadFromEnterpriseHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="fileDownloadFromEnterpriseHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- GROOVY_SCRIPT_EXECUTION Handler -->
<si:channel id="GROOVY_SCRIPT_EXECUTIONHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="GROOVY_SCRIPT_EXECUTIONHighChannel" ref="updateResultStatusHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="groovyScriptExecutionHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="groovyScriptExecutionHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- SHELL_SCRIPT_EXECUTION Handler -->
<si:channel id="SHELL_SCRIPT_EXECUTIONHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="SHELL_SCRIPT_EXECUTIONHighChannel" ref="updateResultStatusHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="shellScriptExecutionHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="shellScriptExecutionHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- NOTIFY_DEVICE Handler -->
<si:channel id="NOTIFY_DEVICEHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="NOTIFY_DEVICEHighChannel" ref="updateResultStatusHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="notifyDeviceHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="notifyDeviceHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- AGENT_REGISTRATION Handler -->
<si:channel id="AGENT_REGISTRATIONHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="AGENT_REGISTRATIONHighChannel" ref="updateResultStatusHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="agentRegistrationHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="agentRegistrationHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- FILE_WATCHER Handler -->
<si:channel id="FILE_WATCHERHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="FILE_WATCHERHighChannel" ref="updateResultStatusHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="fileWatcherHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="fileWatcherHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- CONFIGURE_FAST_POLLING Handler -->
<si:channel id="CONFIGURE_FAST_POLLINGHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="CONFIGURE_FAST_POLLINGHighChannel" ref="updateResultStatusHandler" method="handleRawMissionResult" output-channel="prepareResultsHighChannel">
<si:poller task-executor="configureFastPollingHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="configureFastPollingHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- FILE_UPLOAD_TO_ENTERPRISE Handler, This is Murthy's Service -->
<si:channel id="FILE_UPLOAD_TO_ENTERPRISEHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="FILE_UPLOAD_TO_ENTERPRISEHighChannel" expression="@fileTransferHandler.handleMissionResult(payload,false,headers)" output-channel="prepareResultsHighChannel">
<si:poller task-executor="fileUploadToEnterpriseHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="fileUploadToEnterpriseHandlerHighTaskExecutor" pool-size="10-30" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- DB_SCRIPT_EXECUTION Handler -->
<si:channel id="DB_SCRIPT_EXECUTIONHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="DB_SCRIPT_EXECUTIONHighChannel" expression="@fileTransferHandler.handleMissionResult(payload,false,headers)" output-channel="prepareResultsHighChannel">
<si:poller task-executor="dbScriptExecutionHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="dbScriptExecutionHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- FILE_SEARCH Handler -->
<si:channel id="FILE_SEARCHHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="FILE_SEARCHHighChannel" expression="@fileSearchHandler.handleMissionResult(payload,headers)" output-channel="prepareResultsHighChannel">
<si:poller task-executor="fileSearchHandlerHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="fileSearchHandlerHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!--Category Handlers -->
<!-- DIRECTORY_WATCHER Handler -->
<si:channel id="FILE_WATCHER_CATEGORYHighChannel">
<si:queue capacity="50"/>
</si:channel>
<si:service-activator input-channel="FILE_WATCHER_CATEGORYHighChannel" expression="@fileTransferHandler.handleMissionResult(payload,true,headers)" output-channel="prepareResultsHighChannel">
<si:poller task-executor="fileWatcherHandlerCategoryHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<task:executor id="fileWatcherHandlerCategoryHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- Below are the channels required to do aggregation -->
<si:channel id="prepareResultsHighChannel">
<si:queue capacity="500"/>
</si:channel>
<si:channel id="aggregateHighChannel">
<si:queue capacity="500"/>
</si:channel>
<si:channel id="aggregateFileSearchHighChannel">
<si:queue capacity="500"/>
</si:channel>
<si:channel id="aggregateCommonHighChannel">
<si:queue capacity="500"/>
</si:channel>
<si:channel id="discardHighChannel">
<si:queue capacity="500"/>
</si:channel>
<!-- AGGREGATOR-->
<bean id="resultAggregatorBean" class="com.foo.service.consume.result.ResultAggregator" />
<si:aggregator input-channel="prepareResultsHighChannel" output-channel="aggregateHighChannel"
ref="resultAggregatorBean" method="send" release-strategy="resultAggregatorBean"
release-strategy-method="release" correlation-strategy="resultAggregatorBean"
correlation-strategy-method="correlate" send-partial-result-on-expiry="false" send-timeout="300000" discard-channel="discardHighChannel"/>
<!-- ROUTER -->
<bean id="aggregatorRouter" class="com.foo.service.process.router.AggregatorRouter"/>
<si:router input-channel="aggregateHighChannel" ref="aggregatorRouter" method ="routeHighMessage"/>
<bean id="aggregatorUtil" class="com.foo.model.process.aggregate.AggregatorUtil"/>
<bean id="aggregateFileSearchResultService" class="com.foo.model.process.aggregate.impl.AggregateFileSearchResultServiceImpl">
<property name="missionResultDAO" ref="missionResultDAO"/>
<property name="aggregatorUtil" ref="aggregatorUtil"/>
</bean>
<!-- SERVICE-ACTIVATOR TO PROCESS MESSAGE ON HIGH PRIORITY, DEFAULT OR ADVANCE FILE-SEARCH-CHANNEL -->
<si:service-activator input-channel="aggregateFileSearchHighChannel" ref="aggregateFileSearchResultService" method="processAggregatorResponse" output-channel="ackHighChannel">
<si:poller task-executor="aggregateResultHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<!-- TASK EXECUTOR TO PROCESS MESSAGE ON ON HIGH PRIORITY FILE-SEARCH-CHANNEL -->
<task:executor id="aggregateResultHighTaskExecutor" pool-size="2-15" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- BEAN TO PROCESS MESSAGE ON COMMON-CHANNEL -->
<bean id="aggregateCommonResultService" class="com.foo.model.process.aggregate.impl.AggregateCommonResultServiceImpl">
<property name="missionResultDAO" ref="missionResultDAO"/>
<property name="aggregatorUtil" ref="aggregatorUtil"/>
</bean>
<!-- SERVICE-ACTIVATOR TO PROCESS MESSAGE ON COMMON-CHANNEL -->
<si:service-activator input-channel="aggregateCommonHighChannel" ref="aggregateCommonResultService" method="processAggregatorResponse" output-channel="ackHighChannel">
<si:poller task-executor="aggregateCommonHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<!-- TASK EXECUTOR TO PROCESS MESSAGE ON COMMON-CHANNEL -->
<task:executor id="aggregateCommonHighTaskExecutor" pool-size="2-30" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<bean id="evaluateResultService" class="com.foo.model.process.aggregate.impl.EvaluateResultServiceImpl">
<property name="missionResultDAO" ref="missionResultDAO"/>
</bean>
<bean id="discardResultDAO" class="com.foo.dao.mission.result.impl.DiscardResultDAOImpl">
<property name="sessionFactory" ref="mySessionFactory" />
<property name="missionResultDAO" ref="missionResultDAO"/>
</bean>
<!-- BEAN TO PROCESS MESSAGE ON DISCARD-CHANNEL -->
<bean id="discardResultService" class="com.foo.model.process.aggregate.impl.DiscardResultServiceImpl">
<property name="discardResultDAO" ref="discardResultDAO"/>
<property name="aggregatorUtil" ref="aggregatorUtil"/>
</bean>
<!-- SERVICE-ACTIVATOR TO PROCESS MESSAGE ON DISCARD-CHANNEL -->
<si:service-activator input-channel="discardHighChannel" ref="discardResultService" method="applyFailureStatusToMessagesOnDiscardChannel" output-channel="ackHighChannel">
<si:poller task-executor="discardHighTaskExecutor" fixed-delay="1000"/>
</si:service-activator>
<!-- TASK EXECUTOR TO PROCESS MESSAGE ON DISCARD-CHANNEL -->
<task:executor id="discardHighTaskExecutor" pool-size="2-30" queue-capacity="2" keep-alive="1" rejection-policy="CALLER_RUNS"/>
<!-- dispatch the result-->
<!-- THIS IS THE FINAL ELEMENT WHICH WOULD UPDATE THE STAGING TABLE WITH THE MESSAGE STATUS TO SUCESS OR FAILURE -->
<si:outbound-channel-adapter ref="acknowledgementService" method="receiveMessage" channel="ackHighChannel" order="1">
</si:outbound-channel-adapter>