Using request/reply router with the collection splitter/aggregator, I successfully split an array of messages to its worker asynchronously, then use the aggregator to consolidate the result back together very nicely.
Now I want to do a looping (synchronously) on top of the above, so I use Foreach MP or another set of split-aggregate on top of the existing one (yes, I did save those properties as the invocation property area and restore them back).
I can see it finished with the aggregator for the first iteration, but the VM inbound-end point in the request/reply router never get anything back so the got stuck. I tried many things, but nothing helps. Any idea why?
I have two String array sa: {11, 12, 13} and sb: {21, 22, 23} under an ArrayList AL. I want loop over AL synchronouly, for each String array, I want to do split-aggregate asynchronously.
Any help is very much appreciated.
Suli
David, Thank you.
I put logger right after the request/reply router, the flow doesn't hit it. I also has an logger right after the collection aggregator and it does hit it.
Here is the XML Config -----------
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:core="http://www.mulesoft.org/schema/mule/core" version="EE-3.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd ">
<queued-asynchronous-processing-strategy name="all2thread" maxThreads="2" doc:name="Queued Asynchronous Processing Strategy"/>
<flow name="splitertest2Flow1" doc:name="splitertest2Flow1">
<http:inbound-endpoint exchange-pattern="request-response" host="localhost" port="8581" doc:name="HTTP"/>
<expression-filter expression="#[groovy:!payload.contains('.ico')]" doc:name="Expression"/>
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy">
<scripting:text><![CDATA[String [] sa = new String[3];
sa[0]="message ... 11";
sa[1]="message ... 12";
sa[2]="message ... 13";
String [] sb = new String[3];
sb[0]="message ... 21";
sb[1]="message ... 22";
sb[2]="message ... 23";
ArrayList al = new ArrayList();
al.add(sa);
al.add(sb);
message.setPayload(al);
return message;]]></scripting:text>
</scripting:script>
</scripting:transformer>
<foreach doc:name="Foreach">
<request-reply storePrefix="workStore">
<vm:outbound-endpoint path="work.IN">
<message-properties-transformer scope="outbound">
<delete-message-property key="MULE_REPLYTO"/>
</message-properties-transformer>
</vm:outbound-endpoint>
<vm:inbound-endpoint path="work.OUT"></vm:inbound-endpoint>
</request-reply>
<logger message="******** Almost there....." level="INFO" doc:name="Logger"/>
</foreach>
<logger message="************** Very Happy to get here **********************" level="INFO" doc:name="Logger"/>
</flow>
<flow name="splitertest2Flow2" doc:name="splitertest2Flow2">
<vm:inbound-endpoint exchange-pattern="one-way" path="work.IN" doc:name="VM"/>
<collection-splitter doc:name="Collection Splitter"/>
<flow-ref name="DoWork2" doc:name="DoWork2"/>
</flow>
<flow name="DoWork2" doc:name="DoWork2" processingStrategy="all2thread">
<scripting:transformer doc:name="Groovy">
<scripting:script engine="Groovy">
<scripting:text><![CDATA[String msg = message.getPayload();
println "processing..."+msg;
Thread.sleep(1500);
println "exit..."+msg;
return message;]]></scripting:text>
</scripting:script>
</scripting:transformer>
<vm:outbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
</flow>
<flow name="splitertest2Flow3" doc:name="splitertest2Flow3" processingStrategy="all2thread">
<vm:inbound-endpoint exchange-pattern="one-way" path="work.Q" doc:name="VM"/>
<collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
<logger message="************ after aggregator ************" level="INFO" doc:name="Logger"/>
<vm:outbound-endpoint exchange-pattern="one-way" path="work.OUT" doc:name="VM"/>
</flow>
</mule>