Message grouping doesn't appear to be working
- My Producer application sends the message to the queue via JMS
MessageProducer
after setting the string propertyJMSXGroupID
to'product=paper'
- My producer application sends another message the same way also with
'product=paper'
. - I can see both messages in the queue when I browse that message's headers in the Artemis UI.
_AMQ_GROUP_ID
has a value of'product=paper'
in both.JMSXGroupID
is absent. - When I debug my listener application which uses Spring JMS with a concurrency of 15-15 (15 min 15 max) I can see both messages come through logged under different listener containers. When I look at the map of headers for each,
_AMQ_GROUP_ID
is absent andJMSXGroupID
has a value of null instead of'product=paper'
.
Why isn't message grouping with group id working? Does it have to do with the fact that Artemis didn't translate _AMQ_GROUP_ID
back to JMSXGroupID
? Or is Spring JMS not registering its multiple consumer threads as different consumers for the broker to see multiple consumers?
Edit: I was able to get message grouping to work in my application by commenting out lines having to do with using transacted sessions from my container factory bean method. It seems to have to do with using transacted sessions.
Edit2:
Here's a self contained application running against a local standalone Artemis broker (version 2.10.1) and using Spring Boot 2.2.0:
GroupidApplication (spring boot application and beans):
package com.reproduce.groupid;
import java.util.HashMap;
import java.util.Map;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {
private static Logger LOG = LoggerFactory
.getLogger(GroupidApplication.class);
@Autowired
private JmsTemplate jmsTemplate;
@Autowired MessageConverter messageConverter;
public static void main(String[] args) {
LOG.info("STARTING THE APPLICATION");
SpringApplication.run(GroupidApplication.class, args);
LOG.info("APPLICATION FINISHED");
}
@Override
public void run(String... args) throws JMSException {
LOG.info("EXECUTING : command line runner");
jmsTemplate.setPubSubDomain(true);
createAndSendTextMessage("Message1");
createAndSendTextMessage("Message2");
createAndSendTextMessage("Message3");
createAndSendTextMessage("Message4");
createAndSendTextMessage("Message5");
createAndSendTextMessage("Message6");
}
private void createAndSendTextMessage(String messageBody) {
jmsTemplate.send("local-queue", session -> {
Message message = session.createTextMessage(messageBody);
message.setStringProperty("JMSXGroupID", "product=paper");
return message;
});
}
// BEANS
@Bean
public JmsListenerContainerFactory<?> containerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer, JmsTransactionManager jmsTransactionManager) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setSubscriptionDurable(true);
factory.setSubscriptionShared(true);
factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
factory.setSessionTransacted(Boolean.TRUE);
factory.setTransactionManager(jmsTransactionManager);
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
@Primary
public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory);
// Lazily retrieve existing JMS Connection from given ConnectionFactory
jmsTransactionManager.setLazyResourceRetrieval(true);
return jmsTransactionManager;
}
@Bean
@Primary
public ConnectionFactory connectionFactory() throws JMSException {
// Create ConnectionFactory which enables failover between primary and backup brokers
ActiveMQConnectionFactory activeMqConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA(
JMSFactoryType.CF, transportConfigurations());
activeMqConnectionFactory.setBrokerURL("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
activeMqConnectionFactory.setUser("admin");
activeMqConnectionFactory.setPassword("admin");
activeMqConnectionFactory.setInitialConnectAttempts(1);
activeMqConnectionFactory.setReconnectAttempts(5);
activeMqConnectionFactory.setConsumerWindowSize(0);
activeMqConnectionFactory.setBlockOnAcknowledge(true);
activeMqConnectionFactory.setCacheDestinations(true);
activeMqConnectionFactory.setRetryInterval(1000);
return activeMqConnectionFactory;
}
private static TransportConfiguration[] transportConfigurations() {
String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
Map<String, Object> primaryTransportParameters = new HashMap<>(2);
primaryTransportParameters.put("host", "localhost");
primaryTransportParameters.put("port", "61616");
TransportConfiguration primaryTransportConfiguration = new TransportConfiguration(connectorFactoryFqcn,
primaryTransportParameters);
return new TransportConfiguration[] { primaryTransportConfiguration,
new TransportConfiguration(connectorFactoryFqcn) };
}
}
CustomSpringJmsListener:
package com.reproduce.groupid;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class CustomSpringJmsListener {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@JmsListener(destination = "local-queue", subscription = "groupid-example", containerFactory = "containerFactory", concurrency = "15-15")
public void receive(TextMessage message) throws JMSException {
LOG.info("Received message: " + message);
}
}
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.reproduce</groupId>
<artifactId>groupid</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>groupid</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
You can see that even though all of these messages have the same group id, they get logged by different listener container threads. If you comment out the transaction manager from the bean definition it starts working again.
MessageConsumer
? Eliminating Spring JMS will indicate whether the issue is with Spring JMS or with Artemis itself. – Justin Bertrammain()
method which subscribes to the topic and just run that application multiple times. – Justin Bertram