4
votes

I am working on a system where we need to implement a prioritized queue. We have messages with different priorities and we need to process messages based on priority. Right now, we are looking to use ActiveMQ as our queuing technology for many reasons, one of which is that is supports priority queues.

With a priority queue in ActiveMQ, what is the best way of dealing with starvation? To be specific, we need to ensure that even a low priority message eventually gets processed even if higher priority messages continue to flood the queue. Does ActiveMQ have something built-in? Or do we need to build something of our own to increase the priority as the message ages?

1
just thinking this through, isn't it possible that a flood of low-priority messages, if bumped up eventually in priority, will starve out the legitimately high-priority messages? and if high-priority messages are coming that fast, isn't it time to shutdown the system for repair?jcomeau_ictx

1 Answers

4
votes

a basic way to do this is to bump up the priority when the message gets older

this way a low priority message from say an hour ago is higher priority then a new high priority message

public class Message implements Comparable<Message>{

    private final long time;//timestamp from creation (can be altered to insertion in queue) in millis the lower this value the older the message (and more important that it needs to be handled)
    private final int pr;//priority the higher the value the higher the priority

    /**
     * the offset that the priority brings currently set for 3 hours 
     *
     * meaning a message with pr==1 has equal priority than a message with pr==0 from 3 hours ago
     */
    private static final long SHIFT=3*60*60*1000; 

    public Message(int priority){
        this.pr=priority;
        this.time = System.currentTimeMillis();
    }

    //I'm assuming here the priority sorting is done with natural ordering
    public boolean compareTo(Message other){
        long th = this.time-this.pr*SHIFT;
        long ot = other.time-other.pr*SHIFT;
        if(th<ot)return 1;
        if(th>ot)return -1;
        return 0;
    }

}

as noted in the comments however a flood from low prio messages from several hours ago will temporarily starve the new high prio messages and to space those properly out will require a more sophisticated method


another method is using multiple queues, one for each priority and taking several out of the higher priority queue for each taken out of the low priority queue

this last method is only really viable for a low amount of priorities while the first method I provided can handle an arbitrary amount of priorities