0
votes

Message grouping doesn't appear to be working

  1. My Producer application sends the message to the queue via JMS MessageProducer after setting the string property JMSXGroupID to 'product=paper'
  2. My producer application sends another message the same way also with 'product=paper'.
  3. I can see both messages in the queue when I browse that message's headers in the Artemis UI. _AMQ_GROUP_ID has a value of 'product=paper' in both. JMSXGroupID is absent.
  4. When I debug my listener application which uses Spring JMS with a concurrency of 15-15 (15 min 15 max) I can see both messages come through logged under different listener containers. When I look at the map of headers for each, _AMQ_GROUP_ID is absent and JMSXGroupID has a value of null instead of 'product=paper'.

Why isn't message grouping with group id working? Does it have to do with the fact that Artemis didn't translate _AMQ_GROUP_ID back to JMSXGroupID? Or is Spring JMS not registering its multiple consumer threads as different consumers for the broker to see multiple consumers?

Edit: I was able to get message grouping to work in my application by commenting out lines having to do with using transacted sessions from my container factory bean method. It seems to have to do with using transacted sessions.

Edit2:

Here's a self contained application running against a local standalone Artemis broker (version 2.10.1) and using Spring Boot 2.2.0:

GroupidApplication (spring boot application and beans):

package com.reproduce.groupid;

import java.util.HashMap;
import java.util.Map;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {

    private static Logger LOG = LoggerFactory
            .getLogger(GroupidApplication.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired MessageConverter messageConverter;

    public static void main(String[] args) {
        LOG.info("STARTING THE APPLICATION");
        SpringApplication.run(GroupidApplication.class, args);

        LOG.info("APPLICATION FINISHED");
    }

    @Override
    public void run(String... args) throws JMSException {
        LOG.info("EXECUTING : command line runner");

        jmsTemplate.setPubSubDomain(true);

        createAndSendTextMessage("Message1");
        createAndSendTextMessage("Message2");
        createAndSendTextMessage("Message3");
        createAndSendTextMessage("Message4");
        createAndSendTextMessage("Message5");
        createAndSendTextMessage("Message6");
    }

    private void createAndSendTextMessage(String messageBody) {
        jmsTemplate.send("local-queue", session -> {
            Message message = session.createTextMessage(messageBody);

            message.setStringProperty("JMSXGroupID", "product=paper");

            return message;
        });
    }


    // BEANS
    @Bean
    public JmsListenerContainerFactory<?> containerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer, JmsTransactionManager jmsTransactionManager) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        configurer.configure(factory, connectionFactory);
        factory.setSubscriptionDurable(true);
        factory.setSubscriptionShared(true);
        factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        factory.setSessionTransacted(Boolean.TRUE);
        factory.setTransactionManager(jmsTransactionManager);

        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @Bean
    @Primary
    public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory);

        // Lazily retrieve existing JMS Connection from given ConnectionFactory
        jmsTransactionManager.setLazyResourceRetrieval(true);

        return jmsTransactionManager;
    }

    @Bean
    @Primary
    public ConnectionFactory connectionFactory() throws JMSException {
        // Create ConnectionFactory which enables failover between primary and backup brokers
        ActiveMQConnectionFactory activeMqConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA(
                JMSFactoryType.CF, transportConfigurations());

        activeMqConnectionFactory.setBrokerURL("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
        activeMqConnectionFactory.setUser("admin");
        activeMqConnectionFactory.setPassword("admin");
        activeMqConnectionFactory.setInitialConnectAttempts(1);
        activeMqConnectionFactory.setReconnectAttempts(5);
        activeMqConnectionFactory.setConsumerWindowSize(0);
        activeMqConnectionFactory.setBlockOnAcknowledge(true);
        activeMqConnectionFactory.setCacheDestinations(true);
        activeMqConnectionFactory.setRetryInterval(1000);

        return activeMqConnectionFactory;
    }

    private static TransportConfiguration[] transportConfigurations() {
        String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
        Map<String, Object> primaryTransportParameters = new HashMap<>(2);

        primaryTransportParameters.put("host", "localhost");
        primaryTransportParameters.put("port", "61616");

        TransportConfiguration primaryTransportConfiguration = new TransportConfiguration(connectorFactoryFqcn,
                primaryTransportParameters);

        return new TransportConfiguration[] { primaryTransportConfiguration,
                new TransportConfiguration(connectorFactoryFqcn) };
    }
}

CustomSpringJmsListener:

package com.reproduce.groupid;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class CustomSpringJmsListener {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    @JmsListener(destination = "local-queue", subscription = "groupid-example", containerFactory = "containerFactory", concurrency = "15-15")
    public void receive(TextMessage message) throws JMSException {
        LOG.info("Received message: " + message);
    }
}

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.reproduce</groupId>
    <artifactId>groupid</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>groupid</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-artemis</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

You can see that even though all of these messages have the same group id, they get logged by different listener container threads. If you comment out the transaction manager from the bean definition it starts working again.

1
What happens if you remove Spring JMS from the use-case and just use a normal JMS MessageConsumer? Eliminating Spring JMS will indicate whether the issue is with Spring JMS or with Artemis itself.Justin Bertram
See here. It is a bug in the artemis client (2.6.4); it works with 2.10.1.Gary Russell
@GaryRussell I think this is a separate issue. In a the same example from that question you linked group id was actually working it just wasn't coming through in the headers.b15
You could write a simple Java app with a main() method which subscribes to the topic and just run that application multiple times.Justin Bertram
Good thing you added this comment; for some reason I didn't get notified of yesterday's comments and they are not in my reponses history. I'll take a look and get back to you.Gary Russell

1 Answers

1
votes

It's all about consumer caching. By default, when using an external TXM, caching is disabled so each message is received on a new consumer.

For this app, you really don't need a transaction manager, sessionTransacted is enough - the container will use local transactions.

If you must use an external transaction manager for some reason, consider changing the cache level.

factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);

See the DMLC javadocs...

/**
 * Specify the level of caching that this listener container is allowed to apply.
 * <p>Default is {@link #CACHE_NONE} if an external transaction manager has been specified
 * (to reobtain all resources freshly within the scope of the external transaction),
 * and {@link #CACHE_CONSUMER} otherwise (operating with local JMS resources).
 * <p>Some Java EE servers only register their JMS resources with an ongoing XA
 * transaction in case of a freshly obtained JMS {@code Connection} and {@code Session},
 * which is why this listener container by default does not cache any of those.
 * However, depending on the rules of your server with respect to the caching
 * of transactional resources, consider switching this setting to at least
 * {@link #CACHE_CONNECTION} or {@link #CACHE_SESSION} even in conjunction with an
 * external transaction manager.
 * @see #CACHE_NONE
 * @see #CACHE_CONNECTION
 * @see #CACHE_SESSION
 * @see #CACHE_CONSUMER
 * @see #setCacheLevelName
 * @see #setTransactionManager
 */
public void setCacheLevel(int cacheLevel) {