1
votes

Edit: Mule 3.4.1

We have a Mule flow that alternates reads from one database with inserts to another, all wrapped inside a transactional scope. In this particular case, one of the later insertions fails and we expect everything to roll back.

When we look in the logs, we see an exception (e.g., duplicate PRIMARY KEY) for the second insertion (i.e., BulkInsertInstanceToCache in the example below). When we look in the database, we see the data from the first insert (BulkInsertActivityToCache in the example below). We expected all data to be gone.

Are we configuring this scope incorrectly for the behavior we want?

Here's a sample of the code where I've cut it down to just two insertions to show the type of processing being done.

<flow name="ProcessBulkUpdateCache" processingStrategy="synchronous" doc:name="ProcessBulkUpdateCache">
    <transactional action="ALWAYS_BEGIN" doc:name="Transactional">
        <jdbc-ee:outbound-endpoint exchange-pattern="request-response" 
                queryKey="GetActivitiesForCache" queryTimeout="-1" connector-ref="SumTotalDatabase">
            <jdbc-ee:transaction action="NONE" />
        </jdbc-ee:outbound-endpoint>

        <jdbc-ee:outbound-endpoint exchange-pattern="request-response" 
                queryKey="BulkInsertActivityToCache" queryTimeout="-1" connector-ref="EAIServiceDatabase">
        </jdbc-ee:outbound-endpoint>

        <jdbc-ee:outbound-endpoint exchange-pattern="request-response" 
                queryKey="GetInstancesForCache" queryTimeout="-1" connector-ref="SumTotalDatabase">
            <jdbc-ee:transaction action="NONE" />
        </jdbc-ee:outbound-endpoint>      

        <jdbc-ee:outbound-endpoint exchange-pattern="request-response" 
                queryKey="BulkInsertInstanceToCache" queryTimeout="-1" connector-ref="EAIServiceDatabase">
        </jdbc-ee:outbound-endpoint>        
    </transactional>

    <catch-exception-strategy doc:name="Unexpected">
        ...etc.
    </catch-exception-strategy>                
</flow>

Edit I tried adding a BEGIN_OR_JOIN transaction element in the first INSERT and an ALWAYS_JOIN transaction element in the second but the code then throws an exception when it reaches the second that there is no transaction open to join.

2

2 Answers

2
votes

Using ALWAYS_BEGIN and ALWAYS_JOIN respectively is the way to go.

But, if it's two different DBs, you need to use XA transactions. A local transaction can't enrol resources from two different databases.

-1
votes

As i know first out-bound should action to be always-began , there after every outbound should be always-began

 <db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" database="ib_trade" useXaTransactions="true" driverClassName="com.mysql.jdbc.Driver" doc:name="MySQL Configuration"/>
    <flow name="transactonmanagerFlow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/ram" doc:name="HTTP"/>
        <logger level="INFO" doc:name="Logger" message="%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%"/>
        <set-payload value="#['hi']" doc:name="Set Payload"/>
        <vm:outbound-endpoint exchange-pattern="request-response" path="temp" connector-ref="VM" doc:name="VM">
            <xa-transaction action="ALWAYS_JOIN" timeout="10000"/>
        </vm:outbound-endpoint>
        <logger level="INFO" doc:name="Logger" message="^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"/>
    </flow>
    <flow name="transactonmanagerFlow1">
        <vm:inbound-endpoint exchange-pattern="request-response" path="temp" connector-ref="VM" doc:name="VM">
            <xa-transaction action="ALWAYS_BEGIN" timeout="100000"/>
        </vm:inbound-endpoint>
        <logger level="INFO" doc:name="Logger" message="**************************************************************************"/>
        <db:insert config-ref="MySQL_Configuration" transactionalAction="ALWAYS_JOIN" doc:name="Database">
            <db:dynamic-query><![CDATA[INSERT INTO `ib_trade`.`swt_symbol`(`idswt_symbol`,`symbol_name`,`symbol_exchange`,`symbol_id`) VALUES ("5","1","1","1");]]></db:dynamic-query>
        </db:insert>
        <logger message="^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$" level="INFO" doc:name="Logger"/>
        <db:insert config-ref="MySQL_Configuration" transactionalAction="ALWAYS_JOIN" doc:name="Copy_of_Database">
            <db:dynamic-query><![CDATA[INSERT INTO `ib_trade`.`swt_symbol`(`idswt_symbol`,`symbol_name`,`symbol_exchange`,`symbol_id`) VALUES ("5","temp","1","1");]]></db:dynamic-query>
        </db:insert>