1
votes

We're trying to set up ActiveMQ 5.9.0 as a message broker using JMS topics, but we're having some issues with the consumption of the messages.

For testing purposes, we have a simple configuration of 1 topic, 1 event producer, and 1 consumer. We send 10 messages one after the other, but every time we run the application, 1-3 of these messages are not consumed! The other messages are consumed and proceesed fine. We can see that all the messages we're published to the topic in the ActiveMQ managment console, but they never reach the consumer, even if we reastart the application (we can see that the numbers in the "Enqueue" and "Dequeue" columns are different).

EDIT: I should also mention that when using queues instead of topic, this problem does not occur.

Why is this happening? Could it have something to do with atomikos (which is the transaction manger)? Or maybe something else in the configuration? Any ideas/suggestions are welcome. :)

This is the ActiveMQ/JMS spring configuration:

    <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName" value="amq" />
    <property name="xaConnectionFactory">
        <bean class="org.apache.activemq.spring.ActiveMQXAConnectionFactory"
            p:brokerURL="${activemq_url}" />
    </property>
    <property name="maxPoolSize" value="10" />
    <property name="localTransactionMode" value="false" />
</bean>

<bean id="cachedConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="connectionFactory" />
</bean>

<!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="cachedConnectionFactory" />
    <property name="sessionTransacted" value="true" />
    <property name="pubSubDomain" value="true"/>
</bean>

<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="test.topic" />
</bean>

<!-- The Spring message listener container configuration -->
<jms:listener-container destination-type="topic"
    connection-factory="connectionFactory" transaction-manager="transactionManager"
    acknowledge="transacted" concurrency="1">
    <jms:listener destination="test.topic" ref="testReceiver"
        method="receive" />
</jms:listener-container>

The producer:

@Component("producer")
public class EventProducer {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Transactional
    public void produceEvent(String message) {
        this.jmsTemplate.convertAndSend("test.topic", message);
    }
}

The consumer:

@Component("testReceiver")
public class EventListener {

    @Transactional
    public void receive(String message) {
        System.out.println(message);
    }  
}

The test:

    @Autowired
    private EventProducer eventProducer;

    public void testMessages() {

    for (int i = 1; i <= 10; i++) {
        this.eventProducer.produceEvent("message" + i);
    }
1

1 Answers

4
votes

That's the nature of JMS topics - only current subscribers receive messages by default. You have a race condition and are sending messages before the consumer has established its subscription, after the container is started. This is a common mistake with unit/integration tests with topics where you are sending and receiving in the same application.

With newer versions of Spring, there is a method you can poll to wait until the subscriber is established (since 3.1, I think). Or, you can just wait a little while before starting to send, or you can make your subscriptions durable.