I was exploring Mule scatter-gather for parallel processing of flows in Mule fork and join pattern. My Mule config is as follows:
<flow name="fork" doc:name="fork">
<http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response" doc:name="HTTP"/>
<set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="2" doc:name="Property"/>
<!-- <all enableCorrelation="IF_NOT_SET" doc:name="All"> -->
<scatter-gather timeout="6000">
<processor-chain>
<async doc:name="Async">
<set-property propertyName="MULE_CORRELATION_SEQUENCE" value="1" doc:name="Property"/>
<flow-ref name="Flow1" doc:name="Flow Reference"/>
</async>
</processor-chain>
<async doc:name="Async">
<set-property propertyName="MULE_CORRELATION_SEQUENCE" value="2" doc:name="Property"/>
<flow-ref name="Flow2" doc:name="Flow Reference"/>
</async>
<!-- </all> -->
</scatter-gather>
<logger message="Main Flow" level="INFO" doc:name="Logger"/>
</flow>
<sub-flow name="Flow1" doc:name="Flow1">
<logger level="INFO" message="Flow1: processing started" doc:name="Logger"/>
<set-payload value="Flow1 Payload" doc:name="Set Payload"/>
<!-- Transformation payload -->
<logger level="INFO" message="Flow1: processing finished" doc:name="Logger"/>
<flow-ref name="Join-Flow" doc:name="Flow Reference"/>
</sub-flow>
<sub-flow name="Flow2" doc:name="Flow2">
<logger level="INFO" message="Flow2: processing started" doc:name="Logger"/>
<set-payload value="Flow2 Payload" doc:name="Set Payload"/>
<scripting:component doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[sleep(2000); return message.payload;]]></scripting:script>
</scripting:component>
<!-- Transformation payload -->
<logger level="INFO" message="Flow2: processing finished" doc:name="Logger"/>
<flow-ref name="Join-Flow" doc:name="Flow Reference"/>
</sub-flow>
<sub-flow name="Join-Flow" doc:name="Join-Flow">
<collection-aggregator timeout="6000" failOnTimeout="true" doc:name="Collection Aggregator"/>
<combine-collections-transformer doc:name="Combine Collections"/>
<logger level="INFO" message="Combined Payload: #[message.payload]" doc:name="Logger"/>
<set-payload value="Soap XML Response" doc:name="Set Payload"/>
</sub-flow>
The scatter and gather is throwing following exception though I am getting the aggregated Payload in Logger:
Exception stack is:
1. null (java.lang.UnsupportedOperationException)
org.mule.VoidMuleEvent:50 (null)
2. null (java.lang.UnsupportedOperationException). Message payload is of type: String (org.mule.api.MessagingException)
org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor:32 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.UnsupportedOperationException
at org.mule.VoidMuleEvent.getMessage(VoidMuleEvent.java:50)
at org.mule.api.routing.AggregationContext$1.evaluate(AggregationContext.java:41)
at org.apache.commons.collections.CollectionUtils.select(CollectionUtils.java:517)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2014-08-17 18:04:52,008 [[try1].fork.2.02] org.mule.api.processor.LoggerMessageProcessor: Flow2: processing started
INFO 2014-08-17 18:04:52,016 [[try1].fork.1.02] org.mule.api.processor.LoggerMessageProcessor: Flow1: processing started
INFO 2014-08-17 18:04:52,017 [[try1].fork.1.02] org.mule.api.processor.LoggerMessageProcessor: Flow1: processing finished
INFO 2014-08-17 18:04:54,309 [[try1].fork.2.02] org.mule.api.processor.LoggerMessageProcessor: Flow2: processing finished
INFO 2014-08-17 18:04:54,347 [[try1].fork.2.02] org.mule.api.processor.LoggerMessageProcessor: Combined Payload: [Flow1 Payload, Flow2 Payload]
One interesting fact is that if I use <all>
router instead of scatter-gather, I don't get this exception. but I guess <all>
router process the flows sequentially and not in parallel as per Mule documentation. So for that reason I tried to choose scatter-gather over all router. But I am not sure how to handle the above exception thrown by the scatter-gather. Is there any way out?
UPDATED FLOW :-
<flow name="fork" doc:name="fork">
<http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response" doc:name="HTTP"/>
<scatter-gather timeout="6000">
<processor-chain>
<flow-ref name="Flow1" doc:name="Flow Reference"/>
</processor-chain>
<flow-ref name="Flow2" doc:name="Flow Reference"/>
</scatter-gather>
<logger message="Main Flow" level="INFO" doc:name="Logger"/>
</flow>
<sub-flow name="Flow1" doc:name="Flow1">
<logger level="INFO" message="Flow1: processing started" doc:name="Logger"/>
<set-payload value="Flow1 Payload" doc:name="Set Payload"/>
<!-- Transformation payload -->
<logger level="INFO" message="Flow1: processing finished" doc:name="Logger"/>
<flow-ref name="Join-Flow" doc:name="Flow Reference"/>
</sub-flow>
<sub-flow name="Flow2" doc:name="Flow2">
<logger level="INFO" message="Flow2: processing started" doc:name="Logger"/>
<set-payload value="Flow2 Payload" doc:name="Set Payload"/>
<scripting:component doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[sleep(2000); return message.payload;]]>
</scripting:script>
</scripting:component>
<!-- Transformation payload -->
<logger level="INFO" message="Flow2: processing finished" doc:name="Logger"/>
<flow-ref name="Join-Flow" doc:name="Flow Reference"/>
</sub-flow>
<sub-flow name="Join-Flow" doc:name="Join-Flow">
<collection-aggregator timeout="6000" failOnTimeout="true" doc:name="Collection Aggregator"/>
<combine-collections-transformer doc:name="Combine Collections"/>
<logger level="INFO" message="Combined Payload: #[message.payload]" doc:name="Logger"/>
<set-payload value="Soap XML Response" doc:name="Set Payload"/>
</sub-flow>
EXCEPTION :-
[try1].ScatterGatherWorkManager.02] org.mule.api.processor.LoggerMessageProcessor: Flow2: processing started
INFO 2014-08-18 13:34:24,361 [[try1].ScatterGatherWorkManager.01] org.mule.api.processor.LoggerMessageProcessor: Flow1: processing started
INFO 2014-08-18 13:34:24,364 [[try1].ScatterGatherWorkManager.01] org.mule.api.processor.LoggerMessageProcessor: Flow1: processing finished
WARN 2014-08-18 13:34:24,366 [[try1].ScatterGatherWorkManager.01] org.mule.routing.correlation.CollectionCorrelatorCallback: Correlation Group Size not set, but correlation aggregator is being used. Message is being forwarded as is
INFO 2014-08-18 13:34:24,401 [[try1].ScatterGatherWorkManager.01] org.mule.api.processor.LoggerMessageProcessor: Combined Payload: [Flow1 Payload]
INFO 2014-08-18 13:34:26,615 [[try1].ScatterGatherWorkManager.02] org.mule.api.processor.LoggerMessageProcessor: Flow2: processing finished
ERROR 2014-08-18 13:34:26,625 [[try1].connector.http.mule.default.receiver.03] org.mule.exception.DefaultMessagingExceptionStrategy:
********************************************************************************
Message : null (java.lang.NullPointerException). Message payload is of type: String
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
org.mule.api.routing.AggregationContext$1:41 (null)
2. null (java.lang.NullPointerException). Message payload is of type: String (org.mule.api.MessagingException)
org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor:32 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
at org.mule.api.routing.AggregationContext$1.evaluate(AggregationContext.java:41)
at org.apache.commons.collections.CollectionUtils.select(CollectionUtils.java:517)
at org.apache.commons.collections.CollectionUtils.select(CollectionUtils.java:498)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
Now as you can see it's not combining the both payload of both flow after removing the set-property ..... It is getting only payload of first flow :- Combined Payload: [Flow1 Payload]
..
logger
after thescatter-gather
: what's the point of using it? If you don't really do much with the aggregated result, you don't need it norall
, justasync
execute the different message processors/chains... – David Dossot