1
votes

Is there is a way by which we can restrict RabbitMQ Queue to dispatch only a fixed number of messages from the Queue to the consumers?

I have 2 Queues Q1 and Q2 and 10 consumers.Every consumer can process the messages from Q1 and Q2.At any given time, only 2 consumers should process messages from Q2.All the 10 consumers can process message from Q1 simultaneously.

Is there any configuration in RabbitMQ which we can specify, so that RabbitMQ pushes only 2 messages from Q2 to any free consumer and push the next 2 only after they are acknowledged, even though other consumers are free and ready to consume.

More background on the issue:

Why only process 2 messages at a time ? : Q2 messages are doing a web service call and the web service end point(third party) can only service 2 messages concurrently.

Cant we use concurrency ? : If we use a ListenerContainer (Spring AMQP) the container is per consumer. We can restrict how many message one consumer can take at a time, but when we have 10 consumers, if there are messages in the Queue, each consumer will get its share.

Can we configure only 2 consumers listening to Q2 ? : I understand we can achieve this by configuring only 2 consumers for Q2, but I am trying to avoid that. If for some reason these 2 consumers goes down, the processing of Q2 will be halted. If 10 consumers are configured, we can guarantee the processing will happen until the last consumer is down.

Looking to see if there is some config in RabbitMQ which we can make use of or any suggested solution.

Thanks in advance !

3
It's usually up to the consumer to not "over consume" messages through some kind of internal rate limit. Depending on your library you can just block in the subscribe callback until you're ready for a new message to arrive.tadman

3 Answers

1
votes

I'm pretty sure that consumer prefetch will accomplish what you want. But, Q2 can only have one consumer for this to work. There is no way to coordinate among multiple consumers - you would have to do that yourself, and could use RabbitMQ to do the coordination.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

0
votes

I think you're getting wrapped up in the problem definition. What you really need is trivial, so let's break this down a bit.

Given two queues, Q1 and Q2

  • 10 consumers
  • Every consumer can process the messages from Q1 and Q2.
  • At any given time, only 2 consumers should process messages from Q2.
  • All the 10 consumers can process message from Q1 simultaneously.

Comments on problem statement

First, queues are assumed to be indepenedent. An independent process P will have queue Q, thus Q1 serves process P1. This is a strict mathematical requirement - you cannot define two queues for a single process P.

Thus, the second constraint is mathematically incorrect, for the same reason that you could not write a valid function that accepts a parameter of type string and bool interchangeably. It must accept one or the other, as they are not compatible types, or it must accept a single common ancestor of the types without regard to the subtypes. This is a variant of the Liskov Substitution Principle.

Redefining the problem

There are a total of 12 consumers in the system:

  • Q1 has 10 consumers
  • Q2 has 2 consumers
  • [Important] Consumers are not shared between queues

Is there any configuration in RabbitMQ which we can specify, so that RabbitMQ pushes only 2 messages from Q2 to any free consumer and push the next 2 only after they are acknowledged, even though other consumers are free and ready to consume.

Based on the new definition of the problem, you have two options:

  1. Use a Basic.Get - pull the next message from the queue as soon as the consumer finishes processing the last message.
  2. Use consumer prefetch with limit 1. This will deliver the first and second messages for each consumer immediately, then deliver additional messages one at a time as the next message for that consumer is acknowledged. This is a bit more complicated, but might make sense if your latency margins are less than 10 milliseconds.

Note that by properly defining the problem space, we have eliminated the fundamental problem of trying to figure out how to ensure only two consumers are processing Q2 messages at any time.

0
votes

try the new feature Single Active Consumer from version 3.8+.

Single active consumer allows to have only one consumer at a time consuming from a queue and to fail over to another registered consumer in case the active one is cancelled or dies. Consuming with only one consumer is useful when messages must be consumed and processed in the same order they arrive in the queue. Single active consumer can be enabled when declaring a queue, with the x-single-active-consumer argument set to true

https://www.rabbitmq.com/consumers.html#single-active-consumer

e.g. with the Java client: