0
votes

I would like to sort messages coming from a jdbc query before aggregating them using a collection aggregator, I think the best way to do this is the resequencer. When I add this line

<resequencer timeout="6000" failOnTimeout="false" />

the workflow produces this error:

********************************************** Message : Execution of the expression "message.payload.INVOICE_NUMBER" failed. (org.mule.api.expression.ExpressionRuntimeException). Message payload is of type: MuleEvent[] Code : MULE_ERROR--2 -------------------------------------------------------------------------------- Exception stack is: 1. [Lorg.mule.api.MuleEvent; cannot be cast to java.util.Map (java.lang.ClassCastException)
org.mvel2.optimizers.impl.refl.nodes.MapAccessor:42 (null) 2. [Error: cannot invoke getter: getPayload [declr.class: org.mule.el.context.MessageContext; act.class: org.mule.el.context.MessageContext] (see trace)] [Near : {... Unknown ....}] ^ [Line: 1, Column: 0] (org.mvel2.CompileException) org.mvel2.optimizers.impl.refl.nodes.GetterAccessor:75 (null) 3. Execution of the expression "message.payload.INVOICE_NUMBER" failed. (org.mule.api.expression.ExpressionRuntimeException)
org.mule.el.mvel.MVELExpressionLanguage:211 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/expression/ExpressionRuntimeException.html) 4. Execution of the expression "message.payload.INVOICE_NUMBER" failed. (org.mule.api.expression.ExpressionRuntimeException). Message payload is of type: MuleEvent[] (org.mule.api.transformer.TransformerMessagingException)
org.mule.transformer.AbstractTransformer:123 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/transformer/TransformerMessagingException.html) -------------------------------------------------------------------------------- Root Exception stack trace: java.lang.ClassCastException: [Lorg.mule.api.MuleEvent; cannot be cast to java.util.Map at org.mvel2.optimizers.impl.refl.nodes.MapAccessor.getValue(MapAccessor.java:42) at org.mvel2.optimizers.impl.refl.nodes.GetterAccessor.getValue(GetterAccessor.java:41) at org.mvel2.optimizers.impl.refl.nodes.VariableAccessor.getValue(VariableAccessor.java:38) + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)


<when expression="#[message.payload.LHF_INVOICE_METHOD == 'EDI']"><processor-chain>
        <message-properties-transformer doc:name="Set Correlation ID">
            <add-message-property key="MULE_CORRELATION_SEQUENCE" value="#[message.payload.INVOICE_NUMBER]" />
        </message-properties-transformer>
<resequencer timeout="6000" failOnTimeout="false" />
<set-payload value="#[message.payload.INVOICE_NUMBER]" doc:name="EDI"/>                                
<collection-aggregator  timeout="3200" failOnTimeout="false" doc:name="EDI"          storePrefix="EDI" />
<smtp:outbound-endpoint host="mail.example.com" to="[email protected]" from="[email protected]" subject="[Invoice Workflow] EDI" responseTimeout="10000" doc:name="EDI"/>
<collection-splitter doc:name="Collection Splitter"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="gMarkProcessedInRoss"     queryTimeout="1000" connector-ref="DatabaseMuleLogin" doc:name="Mark Processed in Ross">
</jdbc:outbound-endpoint>
</processor-chain>
</when>

I added a splitter after the resequencer (below) which works, except that the messages are still coming out unsorted on the email.

<set-property propertyName="MULE_CORRELATION_SEQUENCE" value="#[message.payload.INVOICE_NUMBER]" />
<resequencer timeout="6000" failOnTimeout="false" />
<collection-splitter doc:name="Collection Splitter"/>
<set-payload value="#[message.payload.message.payload.INVOICE_NUMBER]" doc:name="EDI"/>
<echo-component doc:name="Echo"/>                                
<collection-aggregator  timeout="3200" failOnTimeout="false" doc:name="EDI"  storePrefix="EDI"       />
<smtp:outbound-endpoint host="mail.example.com" to="[email protected]" from="[email protected]" subject="[Invoice Workflow] EDI" responseTimeout="10000" doc:name="EDI"/>
<collection-splitter doc:name="Collection Splitter"/>
<jdbc:outbound-endpoint exchange-pattern="one-way" queryKey="RouteEDI" queryTimeout="1000" connector-ref="DatabaseMuleLogin" doc:name="Mark Processed in Ross">
</jdbc:outbound-endpoint>

I added an echo component and they are coming out sorted at that point so I'm pretty sure the aggregator is unsorting them.

1
Copy/paste issue here: "# [message.payload.INVOICE_NUMBER]"? (spaces between # and [).David Dossot
Sort of, I had to add 4 spaces all over the place to get the code block working here... I double checked my original xml and there are no extra spaces after the # there.Benjamin Bryan
Are the correlation group size and correlation ID correctly set on the messages? The resequencer only work if these two properties have relevant values. BTW message-properties-transformer is old style, use set-property instead.David Dossot
Yes, both the group size and correlation id are set earlier in the flow. The aggregator after the re-sequencer works fine (if I remove the resequencer). I actually don't know the size of the group so I set it to 1000 and rely on the timeout, I'm hoping the timeout works with the resequencer? I'll change to the set-property... thanks.Benjamin Bryan
Oh boy now I understand. If the resequencer times-out, it spits a single message whose payload is an array of MuleEvents. You need then to split this array with a <collection-splitter /> and extract the original payload with #[message.payload.message.payload].David Dossot

1 Answers

3
votes

You are correct, the correlation sequence property is named MULE_CORRELATION_SEQUENCE. You can set it as shown hereafter:

<set-property propertyName="MULE_CORRELATION_SEQUENCE"
              value="1" />

The resequencer should then order messages based on that sequence. If not, please update your question with the failing configuration.

Be sure to have the MULE_CORRELATION_GROUP_SIZE property set to the right size and that all the messages to resequence share the same MULE_CORRELATION_ID property.