3
votes

I have a very simple flow in the Mule, it listens to an ActiveMQ queue and if the message from the queue contains the string "fail" an exception is thrown. Without an XA transaction, everything works ok. The message is redelivered after 5 seconds, the next time after some seconds more. After 5 redeliveries the message is moved to the dead letter queue.

With XA transactions the message is redelivered immediately 5 times and then moved to the dead letter queue. How can I tell Mule or ActiveMQ to wait the RedeliveryDelay?

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
    xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts" xmlns:jms="http://www.mulesoft.org/schema/mule/jms"
    xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
    xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.6.1"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd
http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd">

    <spring:beans>

        <!-- Redelivery Policy -->
        <spring:bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
            <spring:property name="maximumRedeliveries" value="5" />
            <spring:property name="initialRedeliveryDelay" value="5000" />
            <spring:property name="redeliveryDelay" value="5000" />
            <spring:property name="useExponentialBackOff" value="true" />
            <spring:property name="backOffMultiplier" value="1.5" />
        </spring:bean>

        <!-- ActiveMQ Connection factory -->
        <spring:bean id="amqXAFactory"
            class="org.apache.activemq.ActiveMQXAConnectionFactory" lazy-init="true"
            name="amqXAFactory">
            <spring:property name="brokerURL"
                value="tcp://localhost:61616" />
            <spring:property name="redeliveryPolicy" ref="redeliveryPolicy" />
        </spring:bean>

    </spring:beans>

    <jbossts:transaction-manager doc:name="JBoss Transaction Manager">
        <property key="com.arjuna.ats.arjuna.coordinator.defaultTimeout"
            value="47" /><!-- this is in seconds -->
        <property key="com.arjuna.ats.arjuna.coordinator.txReaperTimeout"
            value="108000" /><!-- this is in milliseconds -->
    </jbossts:transaction-manager>

    <jms:activemq-connector name="Active_MQconnectorMessages"
        specification="1.1" validateConnections="true" doc:name="Active MQ for XA"
        persistentDelivery="true" connectionFactory-ref="amqXAFactory"
        numberOfConsumers="1" maxRedelivery="5">
        <reconnect-forever frequency="10000" />
    </jms:activemq-connector>

    <flow name="xatestFlow">
        <jms:inbound-endpoint queue="xa.test" doc:name="JMS"
            connector-ref="Active_MQconnectorMessages">
            <xa-transaction action="ALWAYS_BEGIN" />
        </jms:inbound-endpoint>
        <logger message="Message from queue: #[payload]" level="INFO"
            doc:name="Logger" />
        <scripting:component doc:name="fail if payload contains fail">
            <scripting:script engine="Groovy"><![CDATA[if (payload.contains("fail")) {
  throw new IllegalArgumentException("Failed...")
}
]]></scripting:script>
        </scripting:component>
        <logger message="Message is ready" level="INFO" doc:name="Logger" />
    </flow>
</mule>
1

1 Answers

3
votes

It took some time, but I discovered the solution. There are two types of configuration, the server side in ActiveMQ and the client side.

In the client redeliveryPolicy I changed maximumRedeliveries to 0. On the server I created a redeliveryPlugin in the activemq.xml:

    <broker ....>
    ......
    <plugins>
        <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
            <redeliveryPolicyMap>
                <redeliveryPolicyMap>
                    <redeliveryPolicyEntries>
                        <!-- a destination specific policy -->
                        <redeliveryPolicy 
                            queue="amm.input.to.router" maximumRedeliveries="3" 
                            redeliveryDelay="7000"
                            initialRedeliveryDelay="5000"
                            useCollisionAvoidance="true"
                            />
                    </redeliveryPolicyEntries>

                    <!-- the fallback policy for all other destinations -->
                    <defaultEntry>
                        <redeliveryPolicy
                            maximumRedeliveries="4"
                            useExponentialBackOff="true"
                            initialRedeliveryDelay="10000"
                            redeliveryDelay="5000"
                            useCollisionAvoidance="true"
                            backOffMultiplier="1.5"
                            maximumRedeliveryDelay="93600000" />
                    </defaultEntry>
                </redeliveryPolicyMap>
            </redeliveryPolicyMap>
        </redeliveryPlugin>
    </plugins>

    </broker>

Now it is ActiveMQ server that takes care of the redeliveries, not the client.

See http://activemq.apache.org/message-redelivery-and-dlq-handling.html