1
votes

I was exploring Mule scatter-gather for parallel processing of flows in Mule fork and join pattern. My Mule config is as follows:

 <flow name="fork" doc:name="fork">
    <http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response" doc:name="HTTP"/>
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE"  value="2" doc:name="Property"/>
     <!--  <all enableCorrelation="IF_NOT_SET" doc:name="All"> -->
            <scatter-gather timeout="6000">
              <processor-chain>
               <async doc:name="Async">
               <set-property propertyName="MULE_CORRELATION_SEQUENCE" value="1" doc:name="Property"/>
               <flow-ref name="Flow1" doc:name="Flow Reference"/>
               </async>
              </processor-chain>

              <async doc:name="Async">
              <set-property propertyName="MULE_CORRELATION_SEQUENCE" value="2" doc:name="Property"/>
              <flow-ref name="Flow2" doc:name="Flow Reference"/>
               </async>
               <!-- </all> -->
               </scatter-gather>
               <logger message="Main Flow" level="INFO" doc:name="Logger"/>
    </flow>

    <sub-flow name="Flow1" doc:name="Flow1">
     <logger level="INFO" message="Flow1: processing started" doc:name="Logger"/>
     <set-payload value="Flow1 Payload" doc:name="Set Payload"/>
     <!-- Transformation payload -->
      <logger level="INFO" message="Flow1: processing finished" doc:name="Logger"/>
      <flow-ref name="Join-Flow" doc:name="Flow Reference"/>
     </sub-flow>

    <sub-flow name="Flow2" doc:name="Flow2">
    <logger level="INFO" message="Flow2: processing started" doc:name="Logger"/>
     <set-payload value="Flow2 Payload" doc:name="Set Payload"/>
     <scripting:component doc:name="Groovy">
     <scripting:script engine="Groovy"><![CDATA[sleep(2000); return message.payload;]]></scripting:script>
     </scripting:component>
      <!--  Transformation payload -->
    <logger level="INFO" message="Flow2: processing finished" doc:name="Logger"/>
     <flow-ref name="Join-Flow" doc:name="Flow Reference"/>
    </sub-flow>

    <sub-flow name="Join-Flow" doc:name="Join-Flow">
    <collection-aggregator timeout="6000" failOnTimeout="true" doc:name="Collection Aggregator"/>
    <combine-collections-transformer doc:name="Combine Collections"/>
    <logger level="INFO"  message="Combined Payload: #[message.payload]" doc:name="Logger"/>
    <set-payload value="Soap XML Response" doc:name="Set Payload"/>
    </sub-flow> 

The scatter and gather is throwing following exception though I am getting the aggregated Payload in Logger:

Exception stack is:
1. null (java.lang.UnsupportedOperationException)
  org.mule.VoidMuleEvent:50 (null)
2. null (java.lang.UnsupportedOperationException). Message payload is of type: String (org.mule.api.MessagingException)
  org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor:32 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.UnsupportedOperationException
    at org.mule.VoidMuleEvent.getMessage(VoidMuleEvent.java:50)
    at org.mule.api.routing.AggregationContext$1.evaluate(AggregationContext.java:41)
    at org.apache.commons.collections.CollectionUtils.select(CollectionUtils.java:517)
    + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

INFO  2014-08-17 18:04:52,008 [[try1].fork.2.02] org.mule.api.processor.LoggerMessageProcessor: Flow2: processing started
INFO  2014-08-17 18:04:52,016 [[try1].fork.1.02] org.mule.api.processor.LoggerMessageProcessor: Flow1: processing started
INFO  2014-08-17 18:04:52,017 [[try1].fork.1.02] org.mule.api.processor.LoggerMessageProcessor: Flow1: processing finished
INFO  2014-08-17 18:04:54,309 [[try1].fork.2.02] org.mule.api.processor.LoggerMessageProcessor: Flow2: processing finished
INFO  2014-08-17 18:04:54,347 [[try1].fork.2.02] org.mule.api.processor.LoggerMessageProcessor: Combined Payload: [Flow1 Payload, Flow2 Payload]

One interesting fact is that if I use <all> router instead of scatter-gather, I don't get this exception. but I guess <all> router process the flows sequentially and not in parallel as per Mule documentation. So for that reason I tried to choose scatter-gather over all router. But I am not sure how to handle the above exception thrown by the scatter-gather. Is there any way out?

UPDATED FLOW :-

<flow name="fork" doc:name="fork">
<http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response" doc:name="HTTP"/>


         <scatter-gather timeout="6000">
            <processor-chain>
              <flow-ref name="Flow1" doc:name="Flow Reference"/>
                 </processor-chain>
               <flow-ref name="Flow2" doc:name="Flow Reference"/>
          </scatter-gather>
            <logger message="Main Flow" level="INFO" doc:name="Logger"/>
        </flow>

        <sub-flow name="Flow1" doc:name="Flow1">
         <logger level="INFO" message="Flow1: processing started" doc:name="Logger"/>
         <set-payload value="Flow1 Payload" doc:name="Set Payload"/>
         <!-- Transformation payload -->
          <logger level="INFO" message="Flow1: processing finished" doc:name="Logger"/>
          <flow-ref name="Join-Flow" doc:name="Flow Reference"/>
         </sub-flow>

        <sub-flow name="Flow2" doc:name="Flow2">
        <logger level="INFO" message="Flow2: processing started" doc:name="Logger"/>
         <set-payload value="Flow2 Payload" doc:name="Set Payload"/>
         <scripting:component doc:name="Groovy">
         <scripting:script engine="Groovy"><![CDATA[sleep(2000); return message.payload;]]>    
         </scripting:script>
         </scripting:component>
          <!--  Transformation payload -->
        <logger level="INFO" message="Flow2: processing finished" doc:name="Logger"/>
         <flow-ref name="Join-Flow" doc:name="Flow Reference"/>
        </sub-flow>

        <sub-flow name="Join-Flow" doc:name="Join-Flow">
        <collection-aggregator timeout="6000" failOnTimeout="true" doc:name="Collection Aggregator"/>
        <combine-collections-transformer doc:name="Combine Collections"/>
        <logger level="INFO"  message="Combined Payload: #[message.payload]" doc:name="Logger"/>
        <set-payload value="Soap XML Response" doc:name="Set Payload"/>
        </sub-flow> 

EXCEPTION :-

[try1].ScatterGatherWorkManager.02] org.mule.api.processor.LoggerMessageProcessor: Flow2: processing started
INFO  2014-08-18 13:34:24,361 [[try1].ScatterGatherWorkManager.01] org.mule.api.processor.LoggerMessageProcessor: Flow1: processing started
INFO  2014-08-18 13:34:24,364 [[try1].ScatterGatherWorkManager.01] org.mule.api.processor.LoggerMessageProcessor: Flow1: processing finished
WARN  2014-08-18 13:34:24,366 [[try1].ScatterGatherWorkManager.01] org.mule.routing.correlation.CollectionCorrelatorCallback: Correlation Group Size not set, but correlation aggregator is being used. Message is being forwarded as is
INFO  2014-08-18 13:34:24,401 [[try1].ScatterGatherWorkManager.01] org.mule.api.processor.LoggerMessageProcessor: Combined Payload: [Flow1 Payload]
INFO  2014-08-18 13:34:26,615 [[try1].ScatterGatherWorkManager.02] org.mule.api.processor.LoggerMessageProcessor: Flow2: processing finished
ERROR 2014-08-18 13:34:26,625 [[try1].connector.http.mule.default.receiver.03] org.mule.exception.DefaultMessagingExceptionStrategy: 
********************************************************************************
Message               : null (java.lang.NullPointerException). Message payload is of type: String
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. null (java.lang.NullPointerException)
  org.mule.api.routing.AggregationContext$1:41 (null)
2. null (java.lang.NullPointerException). Message payload is of type: String (org.mule.api.MessagingException)
  org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor:32 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/MessagingException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.lang.NullPointerException
    at org.mule.api.routing.AggregationContext$1.evaluate(AggregationContext.java:41)
    at org.apache.commons.collections.CollectionUtils.select(CollectionUtils.java:517)
    at org.apache.commons.collections.CollectionUtils.select(CollectionUtils.java:498)
    + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

Now as you can see it's not combining the both payload of both flow after removing the set-property ..... It is getting only payload of first flow :- Combined Payload: [Flow1 Payload] ..

2
You only have a logger after the scatter-gather: what's the point of using it? If you don't really do much with the aggregated result, you don't need it nor all, just async execute the different message processors/chains...David Dossot
For now I need the aggregated result in the logger .. I will use this aggregated result for later useAnirban Sen Chowdhary

2 Answers

3
votes

Remove the async scope around the message processors in the scatter-gather, which indeed does the parallelization for you so you don't have to.

EDIT: Also remove the set-property message processors that deal with the MULE_CORRELATION properties. The scatter-gather is supposed to do that for you.

EDIT2: You can remove the processor-chain around the single flow-ref: it's useless.

Also, there seem to be a deep misunderstanding about what scatter-gather does: you forcefully make its sub-flow responses converge to a single Join-Flow where you aggregate stuff, effectively by-passing and re-implementing all what the scatter-gather proposes to do for you.

Remove the flow-refs towards Join-Flow, remove Join-Flow and just put its processing logic (not the aggregator) after the scatter-gather.

1
votes

So as per David's suggestion the final working solution is :-

      <flow name="fork" doc:name="fork">
            <http:inbound-endpoint host="localhost" port="8090" path="scattergather" exchange-pattern="request-response" doc:name="HTTP"/>

          <scatter-gather timeout="6000">
           <!-- Calling Flow1 -->
            <flow-ref name="Flow1" doc:name="Flow Reference"/>
            <!-- Calling Flow2 -->
            <flow-ref name="Flow2" doc:name="Flow Reference"/>
          </scatter-gather>

           <!--  <collection-aggregator timeout="6000" failOnTimeout="true" doc:name="Collection Aggregator"/>
            <combine-collections-transformer doc:name="Combine Collections"/> -->
            <logger level="INFO"  message="Combined Payload: #[message.payload]" doc:name="Logger"/>
             <logger level="INFO"  message="Payload1: #[message.payload[0]] and Payload2: #[message.payload[1]] " doc:name="Logger"/>
            <set-payload value="Done Merging ...!!!" doc:name="Set Payload"/>

            <logger message="Back to Main Flow" level="INFO" doc:name="Logger"/>
        </flow>



        <sub-flow name="Flow1" doc:name="Flow1">
           <logger level="INFO" message="Flow1: processing started" doc:name="Logger"/>
           <set-payload value="Flow1 Payload" doc:name="Set Payload"/>
           <logger level="INFO" message="Flow1: processing finished" doc:name="Logger"/>
         </sub-flow>


        <sub-flow name="Flow2" doc:name="Flow2">
           <logger level="INFO" message="Flow2: processing started" doc:name="Logger"/>
           <!-- Sleep function to delay the flow2 payload -->
           <set-payload value="Flow2 Payload" doc:name="Set Payload"/>
               <scripting:component doc:name="Groovy">
                   <scripting:script engine="Groovy"><![CDATA[sleep(3000); return message.payload;]]></scripting:script>
               </scripting:component>
           <logger level="INFO" message="Flow2: processing finished" doc:name="Logger"/>
         </sub-flow>

</mule>