0
votes

I have message-driven-channel-adapter configured that picks the message from queue and sends down stream for processing . service-activator down stream will process this message have some database inserts using spring-jpa and also putting message in queue. As Atomikos is used as transaction manager if any exception is thrown in service-activator the exception message is sent to messageError channel that is configured on message-driven-channel-adapter. I am not able to log this message in to database as I am getting com.atomikos.jdbc.AtomikosSQLException: Transaction is marked for rollback only or has timed out. How to handle the exception message. I am using spring integration 4.2.4.RELEASE. Below is my configuration

<context:annotation-config />

<context:component-scan
    base-package="com.home.sendermain,com.home.queueapi,com.home.processauth" />





<int-jms:message-driven-channel-adapter
    id="pollQueuetoProcess" channel="queueChannel" destination="senderQueue"
    concurrent-consumers="5" max-concurrent-consumers="10" acknowledge="transacted"
    auto-startup="false" error-channel="messageError" transaction-manager="JtaTransactionManager" />



<int:recipient-list-router id="status1router"
    input-channel="queueChannel" default-output-channel="nullChannel">
    <int:recipient channel="messageSuccessChannel"
        selector-expression="payload.getStatus1()==1" />

</int:recipient-list-router>

<int:channel id="processChannel"></int:channel>

<int:chain id="messageSuccessChain" input-channel="messageSuccessChannel">

    <int:service-activator ref="senderMessageProcess" id="check1"
        method="check1" requires-reply="true" />

    <int:service-activator ref="senderMessageProcess"
        id="processMessageStatus" method="processMessageStatus"
        requires-reply="true" />

    <int:recipient-list-router id="status2router"
        default-output-channel="nullChannel">
        <int:recipient channel="nullChannel"
            selector-expression="payload.getStatus2()==8" />
        <int:recipient channel="processChannel"
            selector-expression="payload.getStatus2()==5 || payload.getStatus2()==10" />

    </int:recipient-list-router>

</int:chain>

<int:service-activator input-channel="processChannel"
    output-channel="nullChannel" ref="senderMessageProcess" id="processMessage"
    method="processMessage" requires-reply="true" />


<int:service-activator ref="senderErrorProcess"
    id="processErrorMessage" method="processErrorMessage" input-channel="messageError"
    output-channel="nullChannel" requires-reply="true" />



<bean id="senderQueue" class="com.ibm.mq.jms.MQQueue">
    <constructor-arg value="SENDERQUEUE" />
</bean>

<bean id="testQ" class="com.ibm.mq.jms.MQQueue">
    <constructor-arg value="TESTQUEUE" />
</bean>

<!-- declaring MQ XA connection with max pool size as 10 -->
<bean id="mqConnectionFactory" class="com.ibm.mq.jms.MQXAQueueConnectionFactory">
    <property name="queueManager">
        <value>TESTQMGR</value>
    </property>
</bean>

<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName" value="ALL_MQSeries_XA_RMI" />
    <property name="xaConnectionFactory" ref="mqConnectionFactory" />
    <property name="maxPoolSize">
        <value>10</value>
    </property>
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="explicitQosEnabled" value="true" />
    <property name="sessionTransacted" value="true" />
</bean>


<bean id="entityManagerFactory"
    class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean"
    depends-on="JtaTransactionManager, springJtaPlatformAdapter">
    <property name="packagesToScan"
        value="com.home.domain.entity.processentities" />
    <property name="dataSource" ref="datasource" />
    <property name="jpaVendorAdapter">
        <bean class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter" />
    </property>
    <property name="mappingResources">
        <value>META-INF/orm.xml</value>
    </property>
    <property name="jpaProperties">
        <props>
            <prop key="hibernate.dialect">
                org.hibernate.dialect.Oracle10gDialect
            </prop>
            <prop key="hibernate.max_fetch_depth">3</prop>
            <prop key="hibernate.jdbc.fetch_size">50</prop>
            <prop key="hibernate.jdbc.batch_size">10</prop>
            <prop key="hibernate.show_sql">true</prop>
            <prop key="jadira.usertype.autoRegisterUserTypes">true</prop>
            <prop key="jadira.usertype.databaseZone">jvm</prop>
            <prop key="jadira.usertype.javaZone">jvm</prop>
            <prop key="hibernate.transaction.jta.platform">com.home.AtomikosJtaPlatform</prop>
            <prop key="javax.persistence.transactionType">JTA</prop>
        </props>
    </property>
</bean>


<jpa:repositories base-package="com.home.domain.repository.processrepository"
    entity-manager-factory-ref="entityManagerFactory"
    transaction-manager-ref="JtaTransactionManager" />



<jpa:auditing />



<aop:config>
    <aop:pointcut id="serviceOperation"
        expression="execution(* com.home.sendermain.service.*.*(..)) || execution(* com.home.sendermain.error.*.*(..))" />
    <aop:advisor pointcut-ref="serviceOperation" advice-ref="txAdvice" />
</aop:config>

<tx:advice id="txAdvice" transaction-manager="JtaTransactionManager">
    <tx:attributes>
        <tx:method name="find*" read-only="true" rollback-for="java.lang.Throwable" />
        <tx:method name="count*" propagation="NEVER" rollback-for="java.lang.Throwable" />
        <tx:method name="*" rollback-for="java.lang.Throwable" />
    </tx:attributes>
</tx:advice>

<aop:config>
    <aop:pointcut id="serviceOperationQueue"
        expression="execution(* com.home.queueapi.*.*(..))" />
    <aop:advisor pointcut-ref="serviceOperationQueue"
        advice-ref="txAdviceQueue" />
</aop:config>

<tx:advice id="txAdviceQueue" transaction-manager="JtaTransactionManager">
    <tx:attributes>
        <tx:method name="find*" read-only="true" rollback-for="java.lang.Throwable" />
        <tx:method name="count*" propagation="NEVER" rollback-for="java.lang.Throwable" />
        <tx:method name="*" rollback-for="java.lang.Throwable" />
    </tx:attributes>
</tx:advice>



<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
    init-method="init" destroy-method="shutdownForce">
    <constructor-arg>
        <!-- IMPORTANT: specify all Atomikos properties here -->
        <props>
            <prop key="com.atomikos.icatch.service">
                com.atomikos.icatch.standalone.UserTransactionServiceFactory
            </prop>
        </props>
    </constructor-arg>
</bean>

<!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
<bean id="AtomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
    init-method="init" destroy-method="close" depends-on="userTransactionService">

    <!-- IMPORTANT: disable startup because the userTransactionService above 
        does this -->
    <property name="startupTransactionService" value="false" />

    <!-- when close is called, should we force transactions to terminate or 
        not? -->
    <property name="forceShutdown" value="false" />
</bean>

<!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
<bean id="AtomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"
    depends-on="userTransactionService">
    <property name="transactionTimeout" value="300" />
</bean>

<!-- Configure the Spring framework to use JTA transactions from Atomikos -->
<bean id="JtaTransactionManager"
    class="org.springframework.transaction.jta.JtaTransactionManager"
    depends-on="userTransactionService">
    <property name="transactionManager" ref="AtomikosTransactionManager" />
    <property name="userTransaction" ref="AtomikosUserTransaction" />
</bean>

<!-- declaring database connection for xaresource with poolsize is set to 
    10 -->
<bean id="datasource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName">
        <value>XAORADBMS</value>
    </property>
    <property name="xaDataSourceClassName">
        <value>${jdbc.driverClassName}</value>
    </property>
    <property name="xaProperties">
        <props>
            <prop key="user">${jdbc.username}</prop>
            <prop key="password">${jdbc.password}</prop>
            <prop key="URL">${jdbc.url}</prop>
        </props>
    </property>
    <property name="poolSize">
        <value>${sender.maxPoolSize}</value>
    </property>
</bean>

<bean id="springJtaPlatformAdapter" class="com.home.AtomikosJtaPlatform">
    <property name="jtaTransactionManager" ref="JtaTransactionManager" />
</bean>

Suggest how to handle exception message .

1

1 Answers

0
votes

You need to start a new transaction for the log process - possibly with a simple DB Tx Manager - not Atomikos - see Propagation.REQUIRES_NEW.