I need to implement a fair queuing system such that messages are processed in a round robin fashion, based on the value of some message header, for all values of that header on messages currently queued.
Messages in the system are naturally grouped by some property, of which there are many thousands of possible values and the set of values for messages currently queued changes over time. An analogy would be messages having a header which is the milliseconds part of the time, at the time of message creation. Thus, the header will have a value between 0 and 999, and there will be some distribution of the value across all messages currently queued.
I need to be able to consume messages in an order such that no specific value is prioritised over any other. If the header values of queued messages are distributed thus
value | count
------|-------
A | 3
B | 3
C | 2
Then the consumption order would be A,B,C,A,B,C,A,B
.
If messages with another value are added to the queue they should be automatically added to the round robin sequence.
This implies some knowledge of the currently queued messages, but doesn't require that knowledge to be held by the consumer; the broker might have mechanisms to order delivery in some way.
It's acceptable for there to be some threshold beyond which fair queuing starts. Which is to say that, if the threshold were 10, then it's acceptable to sequentially process 10 messages with the same value, but the 11th message processed should be of the next value in sequence. Next might be the same value, if the only queued messages have that value.
The number of possible values probably precludes simply creating a queue for each, and iterating the queues, though this has not yet been tested.
We're using HornetQ, but if there's alternatives that provide these semantics then I'd love to know.
The messages are jobs and the header values are user ids. What's being sought is that, within some limits, no jobs from any given user will unduly delay jobs from any other user; A user producing 1 million jobs doesn't cause later jobs from other users to wait for that million jobs to be processed.
Consumers on queues in HornetQ are evaluated in creation order, so adding a selective consumer to a queue won't stop any catch-all consumer from receiving messages matching the filter.
JMS Groups don't seem to help, as that ties a given group (user?) to a given consumer.
A potential solution is creating selective consumers on a topic based on demand (e.g.: 10 sequential messages from the same user), with something managing the lifecycle of all selective consumers to ensure the catch-all doesn't process the same message. While possible this does seem to have some onerous synchronisation requirements.