I have a gateway implementation as below. I have defined an error-channel in the gateway to trap any exception in the entire SI flow. The outputChannel sends the response back out of the gateway. As you can see there are 3 parallel flows in the system created using splitter and router.
In ErrorHandlingService, I set the appropriate error codes and send the message back to the aggregateDSLResponseChannel so that the entire flow completes smoothly. I need an implementation where entire flow ends in DSLFlowEndService even when there is an exception and then return response back to the caller by settings appropriate response object.The reason is if there was a request to connect to say WLDP and IRIS and say WLDP flow failed but IRIS succeeds, I don't want to send complete response as failure. Instead I would send an appropriate message back to the caller mentioning partial success (with details of what failed and succeeded)
To test, I introduced an exception in WLDPFlowEndService. The exception comes to error channel and then message also travels back to aggregateDSLResponseChannel and then to DSLFlowEndService. However the response is not sent back to the caller outside gateway. Basically Spring integration flow never completes and times out after 30 secs as configured. Below is the warning that comes in the logs:
15:13:23.271 [dslParallelExecutor-2] WARN org.springframework.integration.core.MessagingTemplate$TemporaryReplyChannel {} - Reply message being sent, but the receiving thread has already received a reply:[Payload=DSLResponseVO
15:13:23.271 [dslParallelExecutor-2] DEBUG org.springframework.integration.channel.DirectChannel {} - postSend (sent=true) on channel 'outputChannel', message: [Payload=DSLResponseVO
**Updated configuration- one that works (super thanks to Gary) **:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputChannel"></int:channel>
<int:channel id="outputChannel"></int:channel>
<int:channel id="dslFlowInitiatorOutputChannel"/>
<int:channel id="routingChannel"/>
<int:channel id="wldpInputChannel">
<int:dispatcher task-executor="wldpParallelExecutor" />
</int:channel>
<int:channel id="lcInputChannel">
<int:dispatcher task-executor="lcParallelExecutor" />
</int:channel>
<int:channel id="irisInputChannel">
<int:dispatcher task-executor="irisParallelExecutor" />
</int:channel>
<int:publish-subscribe-channel id="aggregateDSLResponseChannel"></int:publish-subscribe-channel>
<int:channel id="aggregateDSLOutputChannel">
<int:dispatcher task-executor="dslParallelExecutor" />
</int:channel>
<!-- all thread pools to execute tasks in parallel -->
<task:executor id="dslParallelExecutor" pool-size="50" />
<task:executor id="wldpParallelExecutor" pool-size="25" />
<task:executor id="lcParallelExecutor" pool-size="25" />
<task:executor id="irisParallelExecutor" pool-size="25" />
<int:gateway id="dslServiceFacade" service-interface="com.mycompany.tss.ls.dsl.gateway.IDSLServiceFacade"
default-request-channel="inputChannel" default-reply-channel="outputChannel" error-channel="gatewayErrorChannel" async-executor="dslParallelExecutor">
<int:method name="invoke" request-channel="inputChannel" request-timeout="5000" reply-timeout="3000000"/>
</int:gateway>
<int:service-activator input-channel="inputChannel"
output-channel="dslFlowInitiatorOutputChannel" ref="dslFlowInitiatorService"
method="invoke" id="dslFlowInitiator" />
<bean id="dslFlowInitiatorService" class="com.mycompany.tss.ls.dsl.service.DSLFlowInitiatorService" />
<int:splitter id="systemSplitter" input-channel="dslFlowInitiatorOutputChannel" method="split"
output-channel="routingChannel" ref="systemMessageSplitter">
</int:splitter>
<bean id="systemMessageSplitter" class="com.mycompany.tss.ls.dsl.splitter.SystemMessageSplitter"/>
<int:router id="systemRouter" input-channel="routingChannel" default-output-channel="nullChannel"
expression="headers.get('systemId')">
<int:mapping value="WLDP" channel="wldpInputChannel" />
<int:mapping value="LC" channel="lcInputChannel" />
<int:mapping value="IRIS" channel="irisInputChannel" />
</int:router>
<int:aggregator id="dslResponseAggregator" input-channel="aggregateDSLResponseChannel" output-channel="aggregateDSLOutputChannel"
message-store="dslResponseMessageStore" correlation-strategy-expression="headers['requestId']"
send-partial-result-on-expiry="true">
</int:aggregator>
<int:service-activator input-channel="aggregateDSLOutputChannel"
output-channel="outputChannel" ref="dslFlowEndService"
method="invoke" id="dslFlowEndActivator" />
<bean id="dslFlowEndService" class="com.mycompany.tss.ls.dsl.service.DSLFlowEndService" />
<bean id="dslResponseMessageStore" class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="dslResponseMessageStoreReaper" class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="dslResponseMessageStore" />
<property name="timeout" value="4000" />
</bean>
<task:scheduled-tasks>
<task:scheduled ref="dslResponseMessageStoreReaper" method="run" fixed-rate="10000" />
</task:scheduled-tasks>
<int:wire-tap pattern="*Channel" order="3" channel="wiretap"/>
<int:message-history/>
<int:logging-channel-adapter id="wiretap" level="DEBUG"/>
<!-- Error handling Service Activator to log exceptions and send the response -->
<int:service-activator id="errorHandlingServiceActivator" input-channel="gatewayErrorChannel"
method="invoke" output-channel="aggregateDSLResponseChannel" ref="errorHandlingService">
</int:service-activator>
<!-- Error Handling Service class -->
<bean id="errorHandlingService" class="com.mycompany.tss.ls.dsl.service.ErrorHandlingService" />
<bean id="wldpErrorHandlingService" class="com.mycompany.tss.ls.dsl.service.ErrorHandlingService" />
<bean id="irisErrorHandlingService" class="com.mycompany.tss.ls.dsl.service.ErrorHandlingService" />
<int:channel id="wldpStartChannel"></int:channel>
<int:channel id="irisStartChannel"></int:channel>
<int:channel id="wldpErrorChannel"></int:channel>
<int:channel id="irisErrorChannel"></int:channel>
<int:channel id="wldpOutputChannel"></int:channel>
<int:channel id="irisOutputChannel"></int:channel>
<int:service-activator id="midFlowWLDPActivator" input-channel="wldpInputChannel"
ref="midFlowWLDPGateway" output-channel="aggregateDSLResponseChannel"/>
<int:service-activator id="midFlowIRISActivator" input-channel="irisInputChannel"
ref="midFlowIRISGateway" output-channel="aggregateDSLResponseChannel"/>
<int:gateway id="midFlowWLDPGateway" default-request-channel="wldpStartChannel"
service-interface="com.mycompany.tss.ls.dsl.gateway.IDSLWLDPFacade" error-channel="wldpErrorChannel"
/>
<int:gateway id="midFlowIRISGateway" default-request-channel="irisStartChannel"
service-interface="com.mycompany.tss.ls.dsl.gateway.IDSLIRISFacade" error-channel="irisErrorChannel"
/>
<int:service-activator id="errorHandlingWLDPServiceActivator" input-channel="wldpErrorChannel"
method="invoke" ref="wldpErrorHandlingService">
</int:service-activator>
<int:service-activator id="errorHandlingIRISServiceActivator" input-channel="irisErrorChannel"
method="invoke" ref="irisErrorHandlingService">
</int:service-activator>
<!-- Responsible for closing all the activities related to WLDP before sending response -->
<int:service-activator id="wldpFlowEndServiceActivator" input-channel="wldpStartChannel"
method="invoke" ref="wldpFlowEndService">
</int:service-activator>
<bean id="wldpFlowEndService" class="com.mycompany.tss.ls.dsl.service.wldp.WLDPFlowEndService" />
<!-- Responsible for closing all the activities related to IRIS before sending response -->
<int:service-activator id="irisFlowEndServiceActivator" input-channel="irisStartChannel"
method="invoke">
</int:service-activator>
<bean id="irisFlowEndService" class="com.mycompany.tss.ls.dsl.service.iris.IrisFlowEndService" />
ErrorHandlingService:
public class ErrorHandlingService {
Logger logger = LoggerFactory.getLogger(ErrorHandlingService.class);
public Message<DSLResponseVO> invoke(Message<?> requestMessage) {
Object exception = requestMessage.getPayload();
if(exception instanceof MessagingException){
DSLResponseVO responseVO = new DSLResponseVO();
logger.error("Exception has occurred: {}", ((MessagingException)exception).getMessage());
logger.error("Exception Stacktrace is: ", exception);
responseVO.setRequestId((String)((MessagingException)exception).getFailedMessage().getHeaders().get("requestId"));
responseVO.setSystemId((String)((MessagingException)exception).getFailedMessage().getHeaders().get("systemId"));
responseVO.setErrorCode(DSLErrorConstants.FAILURE_CODE);
responseVO.setErrorMessage(DSLErrorConstants.FAILURE_DESC);
Message<DSLResponseVO> failedMessage = MessageBuilder.withPayload(responseVO)
//.copyHeadersIfAbsent(((MessagingException)exception).getFailedMessage().getHeaders())
.setHeaderIfAbsent("requestId", (String)((MessagingException)exception).getFailedMessage().getHeaders().get("requestId"))
.setHeaderIfAbsent("systemId", (String)((MessagingException)exception).getFailedMessage().getHeaders().get("systemId"))
.setHeaderIfAbsent(MessageHeaders.SEQUENCE_NUMBER, (Integer)((MessagingException)exception).getFailedMessage().getHeaders().get(MessageHeaders.SEQUENCE_NUMBER))
.setHeaderIfAbsent(MessageHeaders.SEQUENCE_SIZE, (Integer)((MessagingException)exception).getFailedMessage().getHeaders().get(MessageHeaders.SEQUENCE_SIZE))
.setHeaderIfAbsent(MessageHeaders.REPLY_CHANNEL, ((MessagingException)exception).getFailedMessage().getHeaders().get(MessageHeaders.REPLY_CHANNEL))
.build();
return failedMessage;
}
return null;
}
}