2
votes

I'm trying to use priority queues with Celery 3.1.23 and RabbitMQ 3.6.11 in my flask app. But the tasks are not picked up by workers based on priority. Here is my code:

Celery config:

CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', RABBITMQ_URL)
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', REDIS_URL)
BROKER_POOL_LIMIT = int(os.environ.get('BROKER_POOL_LIMIT', 10))
CELERY_DEFAULT_QUEUE = 'non_web'
CELERY_IGNORE_RESULT = True
CELERY_ACKS_LATE = True
CELERY_SEND_EVENTS = False
CELERY_TASK_RESULT_EXPIRES = 3600
CELERY_RESULT_PERSISTENT = False

CELERY_TASK_QUEUES = (
    Queue('urgent', Exchange('urgent', type='direct'), routing_key='urgent', queue_arguments={'x-max-priority': 8}),
    Queue('web', Exchange('web', type='direct'), routing_key='web', queue_arguments={'x-max-priority': 6}),
    Queue('non_web', Exchange('non_web', type='direct'), routing_key='non_web', queue_arguments={'x-max-priority': 4}),
    Queue('background', Exchange('background', type='direct'), routing_key='background', queue_arguments={'x-max-priority': 2}),
)

Here I want the tasks queued in urgent queue get higher priority. Then comes web and then non_web and at last background queue. But the tasks queued in non-web is always executed first. Here is the code for jobs.

Jobs:

from time import sleep

@celery.task(queue='web', priority=8)
def high_priority_job():
    print 'High priority job'
    for i in range(1, 31):
        print i
        sleep(1)

@celery.task(queue='non_web', priority=6)
def medium_priority_job():
    print 'Medium priority job'
    for i in range(1, 31):
        print i
        sleep(1)

I tried queuing jobs in a queue like this:

def queue_jobs():
    for i in range(50):
        medium_priority_job.delay()
    for i in range(20):
        high_priority_job.delay()

In this the high priority jobs are being executed only after all the medium priority jobs are completed. I have also tried removing priority from jobs declaration and set it while queuing the job like this:

def queue_jobs():
    for i in range(50):
        medium_priority_job.apply_async(priority=6)
    for i in range(20):
        high_priority_job.apply_async(priority=8)

This didn't work either. I tried upgrading Celery to the latest version. That didn't help either. What is wrong here? Do I need to change any settings?

2
as my knowledge , the priority is within in same queue , not among different queues . So the setting will be " CELERY_TASK_QUEUES = ( Queue('urgent', Exchange('urgent', type='direct'), routing_key='urgent', queue_arguments={'x-max-priority': 8}) ,) " , which 8 is the highest priority . you produce the task into queue "urgent" with different priority , higher is more urgent .Richard Mao

2 Answers

0
votes

No wonder it did not work considering that you are using different queue in each case, so they execute regularly. So to test priority usage use a single queue, and send tasks with different priority levels to it.

Also, there was confusion regarding the priority levels and what they mean when different brokers are used - link.

0
votes

Note that priority values are sorted in reverse order: 0 being highest priority as per celery official documentation. https://docs.celeryproject.org/en/v4.3.0/userguide/routing.html

In your case medium_priority_job will execute first because priority is 6 and next high_priority_job will execute which is correct.