0
votes

I have a Spring configured web app that receives JMS messages via the following listener:

public class EntityPersister implements MessageListener {

    @Resource
    private EntityManager entityManager;

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            Object entity = createEntity(textMessage);
            entityManager.persist(entity);
            entityManager.flush(); //for debugging only
        }
    }
}

When I execute this listener in my application I get a NoTransactionException from the line entityManager.flush().

What do I need to configure so that the entity manager takes part in the already existing JTA transaction?

I already tried @Transactional on the above implementation but with no success.

ActiveMQ is used as the JMS provider. The spring configuration is:

<bean id="jmsConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName" value="atomikos-activemq" />
    <property name="xaConnectionFactory">
        <!-- ActiveMQ wird als JMS Provider genutzt -->
        <bean id="activeMQXAConnectionFactory"
            class="org.apache.activemq.spring.ActiveMQXAConnectionFactory">
            <property name="brokerURL">
                <value>tcp://localhost:61616</value>
            </property>
        </bean>
    </property>
    <property name="maxPoolSize" value="2" />
    <property name="localTransactionMode" value="false" />
</bean>

<bean id="entityPersister" class="EntityPersister" />

<bean id="jmsContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destinationName" ref="entityDestinationName" />
    <property name="messageListener" ref="entityPersister" />
    <property name="sessionTransacted" value="true" />
    <property name="transactionManager" ref="txManager" />
</bean>

OpenJPA is used as the JPA Provider. The persistence unit is:

<persistence-unit name="somePU" transaction-type="JTA">
    <jta-data-source>managedDataSource</jta-data-source>
    <non-jta-data-source>nonManagedDataSource</non-jta-data-source>
    <!-- some entity class listed here -->
    <properties>
        <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema(ForeignKeys=true)" />
    </properties>
</persistence-unit>

The spring configuration is:

<!-- Construct Atomikos UserTransactionManager, needed to configure Spring -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
    init-method="init" destroy-method="close">
    <!-- when close is called, should we force transactions to terminate or 
        not? -->
    <property name="forceShutdown" value="true" />
</bean>

<!-- Also use Atomikos UserTransactionImp, needed to configure Spring -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300" />
</bean>

<!-- Configure the Spring framework to use JTA transactions from Atomikos -->
<bean id="txManager"
    class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager" />
    <property name="userTransaction" ref="atomikosUserTransaction" />
</bean>

<!-- enable the configuration of transactional behavior based on annotations -->
<tx:annotation-driven transaction-manager="txManager" />

<bean id="entityManagerFactory"
    class="org.springframework.orm.jpa.LocalEntityManagerFactoryBean">
    <property name="persistenceUnitName" value="somePU" />
    <property name="jpaPropertyMap">
        <map>
            <entry key="openjpa.ManagedRuntime" value="jndi" />
        </map>
    </property>
</bean>

<bean id="entityManager" factory-bean="entityManagerFactory"
    factory-method="createEntityManager" />

OpenJPA looks up the XA and non-XA datasource from the persistence unit via JNDI.

1
If you are using setMessageListener(), that reception in onMessage happens in separate/another thread. Normally a transaction is thread-bound, so this might be the problem, because your entityManager is bound to a different thread. You possibly have two different threads here.Beryllium
@Beryllium I verified that. Spring's DefaultMessageListenerContainer uses a thread pool. The creation of the XA JMS Session and the onMessage call are made within the same Thread. So the EntityManager is used within thread with a current transaction.SpaceTrucker
What about the injection of the entityManager: Does this happen in the same thread as well? You could verify it by not using setMessageListener or by creating/getting an entityManager in onMessage.Beryllium
@Beryllium That exactly was the problem. If you will put that in an answer, I will accept it.SpaceTrucker

1 Answers

1
votes

If you are using setMessageListener(), the message reception in onMessage happens asynchronously in a separate thread.

The injection of the entityManager happens in a different thread. Normally a transaction is thread-bound. So the injection thread has probably already done its work, and its associated transaction was closed, before message reception starts. Even if that transaction still existed, if would not have been enlisted in the same global transaction used by the message-receiving thread.

You could verify it by creating/getting an entityManager in onMessage explicitly. This way all XA transaction-aware resources should enlist within the same global transaction, because all of these are opened from the same thread.