18
votes

in my python app I'm using Celery as task producer and consumer and RabbitMQ as a broker. Now, I'm implementing prioritization. At first, it looks like it's not working at all, because, according to documentation, I've just added x-max-priority argument to queues. I looked up deeper, and I've found another prioritizations - consumer prioritization and task prioritizations. So, now, it looks like there are three different prioritizations and I'm totally confused. Could you please explain me the difference?

Queue max priority: viz https://www.rabbitmq.com/priority.html

Queue('my_queue', exchange=Exchange('my_queue'), routing_key='my_queue', queue_arguments={'maxPriority': 10})

Consumer priority: viz https://www.rabbitmq.com/consumer-priority.html

Queue('my_queue', exchange=Exchange('my_queue'), routing_key='my_queue', consumer_arguments={'priority': 10})

Task priority: viz https://github.com/celery/celery/issues/2635#issuecomment-173597053

my_task.apply_async(args=(arg1, arg2), priority=6)

Thank you.

Edited after more study:

As I understood after more reading:

Queue max priority is a type of limitation, and tells that this queue is listening only tasks with the priority set to value max up to this argument. But what about higher priority tasks? Does the queue lower priority to itself maximum defined? Ignores them?

Consumer priority looks like prioritization of consumers. If there are two consumers with different priority and both of them are free, the first one who consumes the messages is the one with higher priority. But why is it defined with the Queue and not with the consumer itself?

Task priority should be the prioritization, which is most important to my needs. It tells that this message should be read with given priority.

So, it looks like the best prioritization will be achieved with the combination of all priorities with multiple workers and concurrency set to 1, rather than one worker with higher concurrency and with the worker_prefetch_multiplier and task_acks_late configured.

What do you think? Is that right?

1

1 Answers

1
votes
  • x-max-priority is the RabbitMQ parameter to define a priority queue. The value provided is the highest supported priority of the queue.
  • Consumer priority favours a given consumer if multiple are available for work.
  • Task priority is how you tell celery what priority the task is. Your broker (and queue) needs to support priorities for this to do anything.

If you provide celery a task with a priority higher than your queue's x-max-priority, celery will place the message on the queue with the x-max-priority value.


I am not sure what you mean by "best prioritization". When you start a celery worker, it starts a RabbitMQ consumer and N other processes (controlled by the concurrency parameter) to handle the tasks. The main worker process will consume the task's message and place the task in it's pool to be processed by one of the pool workers.

As you say, you could consume from the queue in a different way by having a queue consumer for each process that actually executes the tasks. Whether this has greater task throughput or not depends on your environment and will depend on the number of tasks going through the queue; however, I don't expect it to affect the prioritisation of the tasks.

Choosing a low prefetch value is wise if you have priority queues (especially if your tasks are long running) as any tasks which have already been consumed (fetched) from the queue prior to the high priority task being placed on the queue will be run first.