9
votes

I have a test queue in celery and I have defined a task for it:

@celery_app.task(queue='test', ignore_result=True)
def priority_test(priority):
    print(priority)

which just print the argument.I want to set the priority attribute which is defined here for appy_async. So, I wrote a for loop like this:

for i in range(100):
    priority_test.apply_async((i%10,), queue="test", priority=i%10)

I excpected to see some result like this:

[2017-12-26 17:21:37,309: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,311: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,314: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,317: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,319: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,321: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,323: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,326: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,329: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,332: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,334: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,336: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,341: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,344: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,346: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,349: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,351: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,353: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,355: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,358: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,360: WARNING/ForkPoolWorker-1] 4

means execute the same priorities after each other but it executed them in the normal way:

[2017-12-26 17:21:37,309: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,311: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,314: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,317: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,319: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,321: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,323: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,326: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,329: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,332: WARNING/ForkPoolWorker-1] 1
[2017-12-26 17:21:37,334: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,336: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,341: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,344: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,346: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,349: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,351: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,353: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,355: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,358: WARNING/ForkPoolWorker-1] 1
[2017-12-26 17:21:37,360: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,362: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,364: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,365: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,367: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,369: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,371: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,373: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,374: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,376: WARNING/ForkPoolWorker-1] 1

How should I apply priority in celery with rabbitmq and what is the priority attribute in the doc above?

1

1 Answers

7
votes

In order to have priority working properly you need to properly configure a couple of settings and you need at least version 3.5.0 of RabbitMQ.

First set the x-max-priority of your queue to 10. From the docs :

from kombu import Exchange, Queue

app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10},
]

A default value for all queues can be set using the task_queue_max_priority setting:

app.conf.task_queue_max_priority = 10

Then configure the following settings:

CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

By default the prefetch multiplier is 4, which in your case will cause the first 4 tasks with priority 10, 9, 8 and 7 to be fetched before the other tasks are present in the queue. The CELERY_ACKS_LATE setting will cause the tasks to be acknowledged after they have been executed. You can experiment with this setting to see what behaviour you prefer.