2
votes

I need to handle certain error conditions within my spring integration flow. My flow is using a message store and setting the error channel on the poller. I had thought that if I handled the message in the error handler that the rollback would not occur, but the messageStore remove (delete) is being rolled back before the error flow is even executed.

Here is a pseudo-flow that duplicates my issue.

<int:channel id="rollbackTestInput" >
    <int:queue message-store="messageStore"/>
</int:channel>

<int:bridge input-channel="rollbackTestInput" output-channel="createException" >
    <int:poller fixed-rate="50" 
        error-channel="myErrorChannel">
        <int:transactional />
    </int:poller>
</int:bridge>

<int:transformer input-channel="createException" output-channel="infoLogger"
    expression="T(GarbageToForceException).doesNotExist()" />

<int:channel id="myErrorChannel">
    <int:queue/> 
</int:channel>

<!-- JDBC Message Store -->
<bean id="messageStore" class="org.springframework.integration.jdbc.JdbcMessageStore">
    <property name="dataSource">
        <ref bean="dataSource" />
    </property>
</bean>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager" >
<property name="dataSource" ref="dataSource"/>

This flow will result in an infinite rollback/poll loop. How can I handle the error and not rollback?

2

2 Answers

0
votes

That's correct behavior. The error-channel logic is accepted around the TX advice for the polling task.

The code looks like:

@Override
public void run() {
    taskExecutor.execute(new Runnable() {
        @Override
        public void run() {
            int count = 0;
            while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) {
                try {
                    if (!pollingTask.call()) {
                        break;
                    }
                    count++;
                }
                catch (Exception e) {
                    if (e instanceof RuntimeException) {
                        throw (RuntimeException) e;
                    }
                    else {
                        throw new MessageHandlingException(new ErrorMessage(e), e);
                    }
                }
            }
        }
    });
}

Where TX Advice is on the pollingTask.call(), but error handling is done from the taskExecutor:

this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, this.errorHandler);

Where your error-channel is configured on that errorHandler as MessagePublishingErrorHandler.

To reach your requirements you can try to follow with synchronization-factory on the <poller>:

<int:transaction-synchronization-factory id="txSyncFactory">
    <int:after-rollback channel="myErrorChannel" />
</int:transaction-synchronization-factory>

Or supply your <int:transformer> with <request-handler-advice-chain>:

<int:request-handler-advice-chain>
    <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
        <property name="onFailureExpression" value="#exception" />
        <property name="failureChannel" value="myErrorChannel" />
        <property name="trapException" value="true" />
    </bean>
</int:request-handler-advice-chain>
0
votes

I have found a different solution that meets my requirements and is less invasive.

Instead of using transactional on the poller, I can use an advice chain that that specifies which exceptions I should rollback and which I shouldn't. For my case, I do not want to rollback most exceptions. So I rollback Throwable and list any specific exceptions I want to rollback.

<int:bridge input-channel="rollbackTestInput"
    output-channel="createException">
    <int:poller fixed-rate="50"
        error-channel="myErrorChannel">
        <int:advice-chain>
            <int:ref bean="txAdvice"/>
        </int:advice-chain>
    </int:poller>
</int:bridge>

<tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
        <tx:method name="*" rollback-for="javax.jms.JMSException"
            no-rollback-for="java.lang.Throwable" />
    </tx:attributes>
</tx:advice>