0
votes

I created a Splitter Aggregator configuration that will split messages and each message will route to a specific service activator. Inside each service activator, it will call a gateway that calls a SOAP service using CXF. I have tried putting the reply-timeout in the SOAP gateways but it doesn't work and still proceed on waiting the services and aggregates the message. What I did now in my code is put the reply-timeout in the Message gateway.

But the problem is, if one service fail from a timeout, the rest of the message that will go to the aggregator will also fail because they belong to the same Message Group.

I have also tried to add send-partial-result-on-expiry="true" in my aggregator but it still return a timeout error.

Is there a way to put a timeout in the channel or service activator? or in the SOAP gateways? so that if one message fail from timeout, it will not affect the successful one?

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns=".....">

<context:component-scan base-package="com.t460.wilkins.services.impl"/>


<!--The gateways and service activators that will call a designated SOAP service using CXF-->
<int:gateway id="FirstGateway" default-request-channel="firstChannel" service-interface="com.t460.wilkins.gateways.FirstGateway"/>    
<int:gateway id="SecondGateway" default-request-channel="secondChannel" service-interface="com.t460.wilkins.gateways.SecondGateway"/>
<int:gateway id="ThirdGateway" default-request-channel="thirdChannel" service-interface="com.t460.wilkins.gateways.ThirdGateway"/>

<int:channel id="firstChannel"/>
<int:service-activator input-channel="firstChannel" ref="firstService" />
<int:channel id="secondChannel"/>
<int:service-activator input-channel="secondChannel" ref="secondService" />
<int:channel id="thirdChannel"/>
<int:service-activator input-channel="thirdChannel" ref="thirdService" />



<!--Configuration for the Splitter Aggregator-->
<!-- The gateway that will invoke the Splitter Aggregator. The gateway that will pass the initial message
and gather the aggregated message-->
<int:gateway id="MessageGateway" service-interface="com.t460.wilkins.gateways.MessageGateway" default-request-channel="asyncSenderChannel" default-reply-channel="asyncSenderChannel" default-reply-timeout="5000"/>

<int:channel id="asyncSenderChannel"/>
<int:channel id="asyncReceiverChannel"/>

<!-- Splitter-->
<int:splitter input-channel="asyncSenderChannel" output-channel="routingChannel" id="messageSplitter" ref="messageSplitter" />
<int:channel id="routingChannel"/>

<!-- Router -->
<int:recipient-list-router id="recipientWithSelector" input-channel="routingChannel">
    <int:recipient channel="firstSplitChannel" selector-expression="headers.msgType eq 'First'"/>
    <int:recipient channel="secondSplitChannel" selector-expression="headers.msgType eq 'Second'"/>
    <int:recipient channel="thirdSplitChannel" selector-expression="headers.msgType eq 'Third'"/>
</int:recipient-list-router>

<int:channel id="firstSplitChannel">
    <int:queue/>
</int:channel>
<int:channel id="secondSplitChannel">
    <int:queue/>
</int:channel>
<int:channel id="thirdSplitChannel">
    <int:queue/>
</int:channel>

<!-- These are the service activators where the splitted messages will be routed. Inside their classes, they each invoke the
an appropriate gateway listed above to get a data from a SOAP service using cxf-->
<int:service-activator input-channel="firstSplitChannel" output-channel="aggregateChannel" 
ref="firstSoapActivator">
    <int:poller receive-timeout="1000" task-executor="taskExecutor" fixed-rate="5"/>
</int:service-activator>

<int:service-activator input-channel="secondSplitChannel" output-channel="aggregateChannel" 
ref="secondSoapActivator">
    <int:poller receive-timeout="1000" task-executor="taskExecutor" fixed-rate="5"/>
</int:service-activator>

<int:service-activator input-channel="thirdSplitChannel" output-channel="aggregateChannel" 
ref="thirdSoapActivator">
    <int:poller receive-timeout="1000" task-executor="taskExecutor" fixed-rate="5"/>
</int:service-activator>

<!--Aggregator-->
<int:channel id="aggregateChannel"/>
<int:aggregator input-channel="aggregateChannel" output-channel="asyncReceiverChannel" id="aggregator"
ref="componentsAggregator" correlation-strategy="componentsCorrelationStrategy"
release-strategy="componentsReleaseStrategy" expire-groups-upon-completion="true" send-partial-result-on-expiry="true"/>

<!--Task Executor-->
<task:executor id="taskExecutor" pool-size="10-1000"
               queue-capacity="5000"/>

---UPDATE----

I tried removing the reply-timeout of the message gateway and put a send-timeout="5000" in the service activators instead but the aggregator is still waiting for all message to arrive.

I also tried putting reply timeout on the SOAP gateways "FirstGateway", "SecondGateway", "ThirdGateway" but it still pushing through and waits for all the messages.

1

1 Answers

0
votes

First of all you need to consider to use a Scatter-Gather: https://docs.spring.io/spring-integration/docs/5.0.8.RELEASE/reference/html/messaging-routing-chapter.html#scatter-gather. Another point that group-timeout won’t help you anyway: there is no guarantee that a group is going to be expired in time. You need to use expression advice to route error to the Aggregator any way. Then in the release function you can filter errors from the final list: https://docs.spring.io/spring-integration/docs/5.0.8.RELEASE/reference/html/messaging-endpoints-chapter.html#message-handler-advice-chain