0
votes

I've tried to ensure that a consumer on a queue (with message grouping) will only receive one message at a time from each queue it's handling, until the consumer acknowledges said message.

For a test I've set up ActiveMQ Artemis and have 3 consumers on a wildcard EXAMPLE.* , and one publisher posting 10 messages to each of 5 queues: EXAMPLE.1 - EXAMPLE.5 . What I'm seeing is that each of the consumers receive messages from the queues immediately. I've tried using the consumer window size setting (as 0) as I thought that would help me only deliver one message at a time from each queue, but that doesn't seem to work.

Have I misunderstood that setting? If so, are there any other settings I should be looking at to help me get this working?

The particular use case I'm trying to achieve is that I'll possibly have many queues and a couple of consumers. And it's important that messages in each of the queues are handled sequentially, but all queues can be handled in parallel.

Thanks!

1
Can you add the code you're using for your producer and consumers?Justin Bertram
Are messages from the same group being recieved by different consumers?Justin Bertram

1 Answers

0
votes

The message grouping supported by ActiveMQ Artemis allows to get all message of the same group processed serially by the same consumer. If all messages of a queue have the same group id, they will be processed serially by the same consumer.

However grouped messages can impact the concurrent processing. For example, if there is a chunk of 100 messages of the groups associated with a client at the head of a queue followed by other messages of the groups associated with another clients then all the first 100 messages will need to be sent to the appropriate client (which is consuming those grouped messages serially) before other messages can be consumed.

The consumer window size only affects the consumer buffer messages from the server. If the consumer window size is set to 0 then the consumer does't buffer any message, so the messages can be delivered to another consumer.

In your case the producer could use the queue name to set the group id so the consumers of EXAMPLE.* would process sequentially the messages of each queue but I would not set the consumer window size to 0 because it could limit the consumers parallelism.

The following demo of the message grouping in topic hierarchy with the consumer window size equal to 0 shows the messages consumed sequentially from each queue and a limited parallelism among consumers.

public static void main(final String[] args) throws Exception {
   Connection connection = null;
   try {
      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
      cf.setConsumerWindowSize(1);
      connection = cf.createConnection();
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      Topic topicSubscribe = ActiveMQJMSClient.createTopic("EXAMPLE.*");

      MessageConsumer[] messageConsumers = new MessageConsumer[] {
         session.createSharedConsumer(topicSubscribe, "EXAMPLE"),
         session.createSharedConsumer(topicSubscribe, "EXAMPLE"),
         session.createSharedConsumer(topicSubscribe, "EXAMPLE")
      };

      MessageProducer producer = session.createProducer(null);

      for (int i = 0; i < 10; i++) {
         for (int t = 0; t < 5; t++) {
            TextMessage groupMessage = session.createTextMessage("Group-" + t + " message " + i);
            groupMessage.setStringProperty("JMSXGroupID", "Group-" + t);
            producer.send(ActiveMQJMSClient.createTopic("EXAMPLE." + t), groupMessage);
         }
      }

      connection.start();

      TextMessage messageReceived;
      for (int i = 0; i < 100; i++) {
         for (int c = 0; c < 3; c++) {
            while ((messageReceived = (TextMessage) messageConsumers[c].receive(500)) != null) {
               System.out.println("Consumer" + c + " received message: " + messageReceived.getText());
            }
            System.out.println("Consumer" + c + " received message: null");
         }
      }
   } finally {
      // Step 12. Be sure to close our resources!
      if (connection != null) {
         connection.close();
      }
   }
}

The output of the demo is:

Consumer0 received message: Group-0 message 0
Consumer0 received message: Group-3 message 0
Consumer0 received message: Group-4 message 0
Consumer0 received message: Group-0 message 1
Consumer0 received message: null
Consumer1 received message: Group-1 message 0
Consumer1 received message: Group-1 message 1
Consumer1 received message: null
Consumer2 received message: Group-2 message 0
Consumer2 received message: Group-2 message 1
Consumer2 received message: null
Consumer0 received message: Group-3 message 1
Consumer0 received message: Group-4 message 1
Consumer0 received message: Group-0 message 2
Consumer0 received message: Group-3 message 2
Consumer0 received message: Group-4 message 2
Consumer0 received message: Group-0 message 3
Consumer0 received message: null
Consumer1 received message: Group-1 message 2
Consumer1 received message: Group-1 message 3
Consumer1 received message: null
Consumer2 received message: Group-2 message 2
Consumer2 received message: Group-2 message 3
Consumer2 received message: null
...