The message grouping supported by ActiveMQ Artemis allows to get all message of the same group processed serially by the same consumer. If all messages of a queue have the same group id, they will be processed serially by the same consumer.
However grouped messages can impact the concurrent processing. For example, if there is a chunk of 100 messages of the groups associated with a client at the head of a queue followed by other messages of the groups associated with another clients then all the first 100 messages will need to be sent to the appropriate client (which is consuming those grouped messages serially) before other messages can be consumed.
The consumer window size only affects the consumer buffer messages from the server. If the consumer window size is set to 0 then the consumer does't buffer any message, so the messages can be delivered to another consumer.
In your case the producer could use the queue name to set the group id so the consumers of EXAMPLE.* would process sequentially the messages of each queue but I would not set the consumer window size to 0 because it could limit the consumers parallelism.
The following demo of the message grouping in topic hierarchy with the consumer window size equal to 0 shows the messages consumed sequentially from each queue and a limited parallelism among consumers.
public static void main(final String[] args) throws Exception {
Connection connection = null;
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setConsumerWindowSize(1);
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topicSubscribe = ActiveMQJMSClient.createTopic("EXAMPLE.*");
MessageConsumer[] messageConsumers = new MessageConsumer[] {
session.createSharedConsumer(topicSubscribe, "EXAMPLE"),
session.createSharedConsumer(topicSubscribe, "EXAMPLE"),
session.createSharedConsumer(topicSubscribe, "EXAMPLE")
};
MessageProducer producer = session.createProducer(null);
for (int i = 0; i < 10; i++) {
for (int t = 0; t < 5; t++) {
TextMessage groupMessage = session.createTextMessage("Group-" + t + " message " + i);
groupMessage.setStringProperty("JMSXGroupID", "Group-" + t);
producer.send(ActiveMQJMSClient.createTopic("EXAMPLE." + t), groupMessage);
}
}
connection.start();
TextMessage messageReceived;
for (int i = 0; i < 100; i++) {
for (int c = 0; c < 3; c++) {
while ((messageReceived = (TextMessage) messageConsumers[c].receive(500)) != null) {
System.out.println("Consumer" + c + " received message: " + messageReceived.getText());
}
System.out.println("Consumer" + c + " received message: null");
}
}
} finally {
// Step 12. Be sure to close our resources!
if (connection != null) {
connection.close();
}
}
}
The output of the demo is:
Consumer0 received message: Group-0 message 0
Consumer0 received message: Group-3 message 0
Consumer0 received message: Group-4 message 0
Consumer0 received message: Group-0 message 1
Consumer0 received message: null
Consumer1 received message: Group-1 message 0
Consumer1 received message: Group-1 message 1
Consumer1 received message: null
Consumer2 received message: Group-2 message 0
Consumer2 received message: Group-2 message 1
Consumer2 received message: null
Consumer0 received message: Group-3 message 1
Consumer0 received message: Group-4 message 1
Consumer0 received message: Group-0 message 2
Consumer0 received message: Group-3 message 2
Consumer0 received message: Group-4 message 2
Consumer0 received message: Group-0 message 3
Consumer0 received message: null
Consumer1 received message: Group-1 message 2
Consumer1 received message: Group-1 message 3
Consumer1 received message: null
Consumer2 received message: Group-2 message 2
Consumer2 received message: Group-2 message 3
Consumer2 received message: null
...