Currently am working with spring integration for new application and started poc to know how to handle the failure cases. In my application spring integration will receive message from IBM mq and validate the header information and route to different queue depends on the message type. incoming message could be bulk message, so i've used splitter and aggregator from spring integration and am having good progress and control over the technical workflow. Currently am facing few issues, we have IBM mq and also webservice as our gateway. Both gateway receive the message and send to splitter channel where splitter splits the message and send to outbound channel( executor channel). so message will be send to destination in parallel and status update service activator will receive the message with same channel with order=2 and send to aggregator. so for its good with implementation.
Problem: if jms outbound gateway throws the execption i've added advise as exception handler which wil send to another service activator to update failure status to DTO object and will have same aggregator channel as output but am not receiving the message in aggregator channel in this case and aggregator receive only in happy flow.
I want to aggregate the outbound successful message and failure message(other service activator update the status) and then the complete status needs to posted to response queue as another outbound or as response in webservice.
i tried to have ordered succesful service activator and failure error handler service activator to have same channel which is input channel for aggregator and its not working.
Appreciated for your guidance to proceed with this workflow
using Spring Integration 2.2.2
<channel id="inbound"/>
<channel id="splitterInChannel"/>
<channel id="splitterOutChannel">
<dispatcher task-executor="splitterExecutor"/>
</channel>
<beans:bean id="splitterExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<beans:property name="corePoolSize" value="5" />
<beans:property name="maxPoolSize" value="10" />
<beans:property name="queueCapacity" value="25" />
</beans:bean>
<channel id="ValidatorChannel"/>
<channel id="outBoundjmsChannel"/>
<channel id="outBoundErrorChannel"/>
<channel id="finalOutputChannel"></channel>
<channel id="aggregatorChannel"/>
<jms:inbound-channel-adapter connection-factory="AMQConnectionFactory"
destination="AMQueue" channel="inbound" auto-startup="true"
extract-payload="false" acknowledge="transacted"></jms:inbound-channel-adapter>
<service-activator ref="InBoundProcessor" input-channel="inbound" output-channel="splitterInChannel"></service-activator>
<!-- splitter -->
<splitter ref="Splitter" method="splitInput" input-channel="splitterInChannel" output-channel="splitterOutChannel"/>
<!-- validator -->
<service-activator ref="Validator" method="validate" input-channel="splitterOutChannel" output-channel="ValidatorChannel"/>
<!-- need to add enricher -->
<service-activator ref="Enricher" method="enrich" input-channel="ValidatorChannel" output-channel="outBoundjmsChannel"/>
<!-- outbound gateway -->
<jms:outbound-channel-adapter channel="outBoundjmsChannel" connection-factory="AMQConnectionFactory" destination-name="outputQueue"
message-converter="customMessageConvertor" order="1" >
<jms:request-handler-advice-chain>
<beans:bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<beans:property name="retryTemplate" >
<beans:bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<beans:property name="retryPolicy">
<beans:bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<beans:property name="maxAttempts" value="2" />
</beans:bean>
</beans:property>
<beans:property name="backOffPolicy">
<beans:bean class="org.springframework.retry.backoff.FixedBackOffPolicy">
<beans:property name="backOffPeriod" value="1000" />
</beans:bean>
</beans:property>
</beans:bean>
</beans:property>
<beans:property name="recoveryCallback">
<beans:bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<beans:constructor-arg ref="outBoundErrorChannel" />
</beans:bean>
</beans:property>
</beans:bean>
</jms:request-handler-advice-chain>
</jms:outbound-channel-adapter>
<!-- outBound error processor -->
<service-activator ref="ErrorProcessor" method="errorHandling" input-channel="outBoundErrorChannel" output-channel="aggregatorChannel" />
<!-- Post send processor -->
<service-activator ref="PostProcessor" method="Postprocessing" input-channel="outBoundjmsChannel" output-channel="aggregatorChannel" order="2"/>
<!-- aggregator -->
<aggregator ref="Aggregator" correlation-strategy-method="aggregateStrategy" input-channel="aggregatorChannel" output-channel="finalOutputChannel"
release-strategy-method="isRelease" method="aggregate" expire-groups-upon-completion="true"/>
<!-- final processor or responder -->
<service-activator ref="FinalProcessor" method="endProcessing" input-channel="finalOutputChannel"/>
</beans:beans>
In the above configuration as of now i've given the release strategy as false and correlation method as empty string if this works, i will generate UUID for the batch and will attach the UUID in splitter to corrlate.
when debugging the above configuration i noticed the outbound error channel receive whenever it attempts to send to the outbound adapter(in my case its send twice ). I don't want to make an reattempt in one of the application and in another application it needs to attempt for reposting the message. In both case i want to send the message to outbound error channel after the final attempt to aggregate, if fails i will update the status in ErrorProcessor as failed to send.
Two issues. 1. am receiving duplicate message to the channel and difficult to identify the last failure or success. 2.Couldn't make logic for release strategy and difficult to identify which is the duplicate and whether its successful or not.
In the above case i couldn't find a generic way to compare objects because equals method doesn't have proper attributes to compare and it will not be correct way to compare with boolean field.
please help me out to resolve this issue to proceed my workflow design and completion.
Much appreciated for guiding me to proceed. Thanks, Krish S
currently
public Object errorHandling(Object object){
OutBoundMessage outBoundMessage = null;
if(object instanceof MessagingException){
outBoundMessage =((MessagingException) object).getFailedMessage();
}else{
//TODO: log the message
}
return outBoundMessage;
}
public String aggregateStrategy(OutBoundMessage outBoundMessage){
//TODO: get the UUID from outbound message and return
return "";
}
public List<OutBoundMessage> splitter(InBoundMessage inBoundMessage){
String[] message = inBoundMessage.getRawMessage().split(",");
long uuid = java.util.UUID.randomUUID().getLeastSignificantBits();
List<OutBoundMessage> outBoundMessagelist = new ArrayList<OutBoundMessage>();
for (String string : message) {
OutBoundMessage outBoundMessage = new outBoundMessage();
outBoundMessage.setCorrelationID(uuid);
outBoundMessagelist.add(outBoundMessage);
}
}
Added as default false in the following method to validate
public boolean isRelease(List<OutBoundMessage> outBoundMessage){
//TODO: need to define condition for closing the list aggregation
return false;
}
am not receiving the message in aggregator channel
. That's weird. I can assume that you can't aggregate because of losscorrelationKey
. Or... your update service returnsnull
. – Artem BilancorrelatationKey
. You should supply it from therequestMessage
when you builderrorMessage
. That's why the config would be helpful, because I even don't understand what anAdvice
you use for error handling. – Artem Bilan