1
votes

Currently am working with spring integration for new application and started poc to know how to handle the failure cases. In my application spring integration will receive message from IBM mq and validate the header information and route to different queue depends on the message type. incoming message could be bulk message, so i've used splitter and aggregator from spring integration and am having good progress and control over the technical workflow. Currently am facing few issues, we have IBM mq and also webservice as our gateway. Both gateway receive the message and send to splitter channel where splitter splits the message and send to outbound channel( executor channel). so message will be send to destination in parallel and status update service activator will receive the message with same channel with order=2 and send to aggregator. so for its good with implementation.

Problem: if jms outbound gateway throws the execption i've added advise as exception handler which wil send to another service activator to update failure status to DTO object and will have same aggregator channel as output but am not receiving the message in aggregator channel in this case and aggregator receive only in happy flow.

I want to aggregate the outbound successful message and failure message(other service activator update the status) and then the complete status needs to posted to response queue as another outbound or as response in webservice.

i tried to have ordered succesful service activator and failure error handler service activator to have same channel which is input channel for aggregator and its not working.

Appreciated for your guidance to proceed with this workflow

using Spring Integration 2.2.2

<channel id="inbound"/>
<channel id="splitterInChannel"/>

<channel id="splitterOutChannel">
<dispatcher task-executor="splitterExecutor"/>
</channel>          
<beans:bean id="splitterExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <beans:property name="corePoolSize" value="5" />
    <beans:property name="maxPoolSize" value="10" />
    <beans:property name="queueCapacity" value="25" />
</beans:bean>
<channel id="ValidatorChannel"/>

<channel id="outBoundjmsChannel"/>
<channel id="outBoundErrorChannel"/>
<channel id="finalOutputChannel"></channel>
<channel id="aggregatorChannel"/>

<jms:inbound-channel-adapter connection-factory="AMQConnectionFactory" 
destination="AMQueue" channel="inbound" auto-startup="true" 
extract-payload="false" acknowledge="transacted"></jms:inbound-channel-adapter>


<service-activator ref="InBoundProcessor" input-channel="inbound" output-channel="splitterInChannel"></service-activator>

<!-- splitter -->
<splitter ref="Splitter" method="splitInput" input-channel="splitterInChannel" output-channel="splitterOutChannel"/>
<!-- validator -->
<service-activator ref="Validator" method="validate" input-channel="splitterOutChannel" output-channel="ValidatorChannel"/>         

<!-- need to add enricher -->
<service-activator ref="Enricher" method="enrich" input-channel="ValidatorChannel" output-channel="outBoundjmsChannel"/>            

<!-- outbound gateway -->

<jms:outbound-channel-adapter channel="outBoundjmsChannel" connection-factory="AMQConnectionFactory" destination-name="outputQueue" 
message-converter="customMessageConvertor" order="1"  >

        <jms:request-handler-advice-chain>
            <beans:bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
                <beans:property name="retryTemplate" >
                    <beans:bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
                        <beans:property name="retryPolicy">
                            <beans:bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                            <beans:property name="maxAttempts" value="2" />
                            </beans:bean>
                        </beans:property>
                    <beans:property name="backOffPolicy">
                        <beans:bean class="org.springframework.retry.backoff.FixedBackOffPolicy">
                        <beans:property name="backOffPeriod" value="1000" />
                        </beans:bean>
                        </beans:property>
                    </beans:bean>            
                </beans:property>
                <beans:property name="recoveryCallback">
                    <beans:bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
                        <beans:constructor-arg ref="outBoundErrorChannel" />
                    </beans:bean>
                </beans:property>

            </beans:bean>
        </jms:request-handler-advice-chain>
    </jms:outbound-channel-adapter>



<!-- outBound error processor -->           
<service-activator  ref="ErrorProcessor" method="errorHandling" input-channel="outBoundErrorChannel" output-channel="aggregatorChannel" />          
<!-- Post send processor -->
<service-activator  ref="PostProcessor" method="Postprocessing" input-channel="outBoundjmsChannel" output-channel="aggregatorChannel" order="2"/>           
<!-- aggregator -->
<aggregator ref="Aggregator" correlation-strategy-method="aggregateStrategy" input-channel="aggregatorChannel" output-channel="finalOutputChannel"
release-strategy-method="isRelease" method="aggregate" expire-groups-upon-completion="true"/>


<!-- final processor or responder -->
<service-activator ref="FinalProcessor" method="endProcessing" input-channel="finalOutputChannel"/>         

</beans:beans>

In the above configuration as of now i've given the release strategy as false and correlation method as empty string if this works, i will generate UUID for the batch and will attach the UUID in splitter to corrlate.

when debugging the above configuration i noticed the outbound error channel receive whenever it attempts to send to the outbound adapter(in my case its send twice ). I don't want to make an reattempt in one of the application and in another application it needs to attempt for reposting the message. In both case i want to send the message to outbound error channel after the final attempt to aggregate, if fails i will update the status in ErrorProcessor as failed to send.

Two issues. 1. am receiving duplicate message to the channel and difficult to identify the last failure or success. 2.Couldn't make logic for release strategy and difficult to identify which is the duplicate and whether its successful or not.

In the above case i couldn't find a generic way to compare objects because equals method doesn't have proper attributes to compare and it will not be correct way to compare with boolean field.

please help me out to resolve this issue to proceed my workflow design and completion.

Much appreciated for guiding me to proceed. Thanks, Krish S


currently

public Object errorHandling(Object object){
        OutBoundMessage outBoundMessage = null;
        if(object instanceof MessagingException){

            outBoundMessage =((MessagingException) object).getFailedMessage();
        }else{
            //TODO: log the message
        }

        return outBoundMessage;
    }




    public String aggregateStrategy(OutBoundMessage outBoundMessage){

        //TODO: get the UUID from outbound message and return 
        return "";
    }




public List<OutBoundMessage> splitter(InBoundMessage inBoundMessage){

        String[] message = inBoundMessage.getRawMessage().split(",");
        long uuid = java.util.UUID.randomUUID().getLeastSignificantBits();
        List<OutBoundMessage> outBoundMessagelist = new ArrayList<OutBoundMessage>();
        for (String string : message) {
            OutBoundMessage outBoundMessage = new outBoundMessage();
            outBoundMessage.setCorrelationID(uuid);
            outBoundMessagelist.add(outBoundMessage);
        }

    }

Added as default false in the following method to validate

public boolean isRelease(List<OutBoundMessage> outBoundMessage){
        //TODO: need to define condition for closing the list aggregation
        return false;
    }
2
You should share the error handling configuration. And explain more what means am not receiving the message in aggregator channel. That's weird. I can assume that you can't aggregate because of loss correlationKey. Or... your update service returns null.Artem Bilan
Thanks, Bilan, I've configured advice to handle the error message, the aggregator is not even grouping the successful message, and i've configured aggregator as expire group on completion. so SI aggregator is waiting for all splitted message where one of the message throws exception and take to error channel. the aggregator still waits for the message which is not reaching the aggregator. please let me know what configuration needs to be done for correlation strategy or release.Krishna Moorthy Seetha Raman
I asked you for the config. Such a "word" discussion may be infinite. Why are you sure that your error messages don't reach aggregator? I guess thy don't carry the proper correlatationKey. You should supply it from the requestMessage when you build errorMessage. That's why the config would be helpful, because I even don't understand what an Advice you use for error handling.Artem Bilan
We can't help without you providing your existing configuration; edit the question, don't try to put it in a comment.Gary Russell
@ArtemBilan ... please suggestKrishna Moorthy Seetha Raman

2 Answers

3
votes

Please, share your ErrorProcessor source code. And correlation-strategy-method="aggregateStrategy" as well.

I would like to know how you deal with ErrorMessage there and how you restore correlationKey from the message after your ErrorProcessor.

Not sure how you build your own correlationKey, but the <splitter> provide applySequence = true by default. So, the Correlation Details are available in each splitted message to be able to aggregate afterwards.

For your ErrorMessage from the ErrorMessageSendingRecoverer I can recommend to pay attention to the Exception payload there. It looks like (from the ErrorMessageSendingRecoverer source code):

else if (!(lastThrowable instanceof MessagingException)) {
        lastThrowable = new MessagingException((Message<?>) context.getAttribute("message"),
                lastThrowable.getMessage(), lastThrowable);
    }
....
messagingTemplate.send(new ErrorMessage(lastThrowable));

So, that MessagingException, has a "guilty" message for the Exception and exactly that message has an appropriate Correlation Details headers for aggregator. Therefore you should rely on them if you'd like to aggregate errors to the same message group.

0
votes

Finally I understood how it works,

I've a boolean set to true in message convertor and in Errorhandle I set it to false and return null so the recovery is that message is received as failed message to aggregator and understood what happens when I return the object Thanks @ArtemBilan, your code block gave me an insight of what's happening and what should I do