1
votes

Using request/reply router with the collection splitter/aggregator, I successfully split an array of messages to its worker asynchronously, then use the aggregator to consolidate the result back together very nicely.

Now I want to do a looping (synchronously) on top of the above, so I use Foreach MP or another set of split-aggregate on top of the existing one (yes, I did save those properties as the invocation property area and restore them back).

I can see it finished with the aggregator for the first iteration, but the VM inbound-end point in the request/reply router never get anything back so the got stuck. I tried many things, but nothing helps. Any idea why?

I have two String array sa: {11, 12, 13} and sb: {21, 22, 23} under an ArrayList AL. I want loop over AL synchronouly, for each String array, I want to do split-aggregate asynchronously.

Any help is very much appreciated.

Suli

David, Thank you.
I put logger right after the request/reply router, the flow doesn't hit it. I also has an logger right after the collection aggregator and it does hit it.

Here is the XML Config -----------

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="EE-3.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd 
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd 
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd ">
    <queued-asynchronous-processing-strategy name="all2thread" maxThreads="2" doc:name="Queued Asynchronous Processing Strategy"/>
    <flow name="splitertest2Flow1" doc:name="splitertest2Flow1">
        <http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8581" doc:name="HTTP"/>
        <expression-filter expression="#[groovy:!payload.contains('.ico')]" doc:name="Expression"/>
        <scripting:transformer doc:name="Groovy">
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[String [] sa = new String[3];
sa[0]="message ... 11";
sa[1]="message ... 12";
sa[2]="message ... 13";

String [] sb = new String[3];
sb[0]="message ... 21";
sb[1]="message ... 22";
sb[2]="message ... 23";
ArrayList al = new ArrayList();
al.add(sa);
al.add(sb);
message.setPayload(al);
return message;]]></scripting:text>
            </scripting:script>
        </scripting:transformer>
        <foreach doc:name="Foreach">
        <request-reply storePrefix="workStore">
            <vm:outbound-endpoint path="work.IN">  
                <message-properties-transformer scope="outbound"> 
                    <delete-message-property key="MULE_REPLYTO"/> 
                </message-properties-transformer> 
            </vm:outbound-endpoint>
            <vm:inbound-endpoint path="work.OUT"></vm:inbound-endpoint>
         </request-reply>
            <logger message="******** Almost there....." level="INFO" doc:name="Logger"/>
        </foreach>
        <logger message="**************  Very Happy to get here **********************" level="INFO" doc:name="Logger"/>
    </flow>
    <flow name="splitertest2Flow2" doc:name="splitertest2Flow2">
        <vm:inbound-endpoint exchange-pattern="one-way" path="work.IN" doc:name="VM"/>
        <collection-splitter doc:name="Collection Splitter"/>
        <flow-ref name="DoWork2" doc:name="DoWork2"/>
    </flow>
    <flow name="DoWork2" doc:name="DoWork2" processingStrategy="all2thread">
        <scripting:transformer doc:name="Groovy">
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[String msg = message.getPayload();
println "processing..."+msg;
Thread.sleep(1500);
println "exit..."+msg;
return message;]]></scripting:text>
            </scripting:script>
        </scripting:transformer>
        <vm:outbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
    </flow>
    <flow name="splitertest2Flow3" doc:name="splitertest2Flow3" processingStrategy="all2thread">
        <vm:inbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
        <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
        <logger message="************ after aggregator  ************" level="INFO" doc:name="Logger"/>
        <vm:outbound-endpoint exchange-pattern="one-way" path="work.OUT" doc:name="VM"/>
    </flow>
</mule>
1
Without a test config that allows reproducing the issue, it will be nearly impossible to help you.David Dossot
David, thanks. The XML config is attached.Suli

1 Answers

2
votes

There are two issues at play that prevent this to work:

  • The foreach element uses the same correlation ID for all the messages it creates which completely messes the collection-aggregator downstream. The aggregator works by grouping on this ID and since it's the same for the six messages, it can't work. To fix this I had to assign a new correlation ID as a first step after foreach.
  • The request-reply computes an "async reply correlation ID" that must be used when dispatching to the reply queue (work.OUT). Usually this "async reply correlation ID" is equal to the message correlation ID, but not in this case (I suspect because we are behind a foreach). To fix this I had to store the asyncReplyCorrelationId in a session variable and re-establish it as the correlation ID right before dispatching to the reply queue.

Here is the complete working config:

<queued-asynchronous-processing-strategy
    name="all2thread" maxThreads="2" />

<flow name="splitertest2Flow1">
    <http:inbound-endpoint exchange-pattern="request-response"
        host="localhost" port="8581" />
    <expression-filter expression="#[groovy:!payload.contains('.ico')]" />
    <scripting:transformer>
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[
                String [] sa = new String[3];
                sa[0]="message ... 11";
                sa[1]="message ... 12";
                sa[2]="message ... 13";

                String [] sb = new String[3];
                sb[0]="message ... 21";
                sb[1]="message ... 22";
                sb[2]="message ... 23";
                ArrayList al = new ArrayList();
                al.add(sa);
                al.add(sb);
                message.setPayload(al);
                return message;
         ]]></scripting:text>
        </scripting:script>
    </scripting:transformer>
    <foreach>
        <scripting:transformer>
            <scripting:script engine="Groovy">
                <scripting:text><![CDATA[
                message.correlationId = UUID.randomUUID().toString()
                return message
             ]]></scripting:text>
            </scripting:script>
        </scripting:transformer>
        <request-reply storePrefix="workStore">
            <vm:outbound-endpoint path="work.IN">
                <message-properties-transformer
                    scope="outbound">
                    <delete-message-property key="MULE_REPLYTO" />
                </message-properties-transformer>
                <message-properties-transformer
                    scope="session">
                    <add-message-property key="asyncReplyCorrelationId"
                        value="#[message.correlationId + message.correlationSequence]" />
                </message-properties-transformer>
            </vm:outbound-endpoint>
            <vm:inbound-endpoint path="work.OUT" />
        </request-reply>
        <logger message="******** Almost there....." level="INFO" />
    </foreach>
    <logger message="**************  Very Happy to get here **********************"
        level="INFO" />
</flow>

<flow name="splitertest2Flow2">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="work.IN" />
    <collection-splitter />
    <flow-ref name="DoWork2" />
</flow>

<flow name="DoWork2">
    <scripting:transformer>
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[
                String msg = message.getPayload();
                println "processing..."+msg;
                Thread.sleep(1500);
                println "exit..."+msg;
                return message;
         ]]></scripting:text>
        </scripting:script>
    </scripting:transformer>
    <vm:outbound-endpoint exchange-pattern="one-way"
        path="work.Q" />
</flow>

<flow name="splitertest2Flow3" processingStrategy="all2thread">
    <vm:inbound-endpoint exchange-pattern="one-way"
        path="work.Q" />
    <collection-aggregator failOnTimeout="true" />
    <logger message="************ after aggregator  ************"
        level="INFO" />
    <scripting:transformer>
        <scripting:script engine="Groovy">
            <scripting:text><![CDATA[
                message.correlationId = asyncReplyCorrelationId
                return message
         ]]></scripting:text>
        </scripting:script>
    </scripting:transformer>
    <vm:outbound-endpoint exchange-pattern="one-way"
        path="work.OUT" />
</flow>