I'm trying to use priority queues with Celery 3.1.23 and RabbitMQ 3.6.11 in my flask app. But the tasks are not picked up by workers based on priority. Here is my code:
Celery config:
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', RABBITMQ_URL)
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', REDIS_URL)
BROKER_POOL_LIMIT = int(os.environ.get('BROKER_POOL_LIMIT', 10))
CELERY_DEFAULT_QUEUE = 'non_web'
CELERY_IGNORE_RESULT = True
CELERY_ACKS_LATE = True
CELERY_SEND_EVENTS = False
CELERY_TASK_RESULT_EXPIRES = 3600
CELERY_RESULT_PERSISTENT = False
CELERY_TASK_QUEUES = (
Queue('urgent', Exchange('urgent', type='direct'), routing_key='urgent', queue_arguments={'x-max-priority': 8}),
Queue('web', Exchange('web', type='direct'), routing_key='web', queue_arguments={'x-max-priority': 6}),
Queue('non_web', Exchange('non_web', type='direct'), routing_key='non_web', queue_arguments={'x-max-priority': 4}),
Queue('background', Exchange('background', type='direct'), routing_key='background', queue_arguments={'x-max-priority': 2}),
)
Here I want the tasks queued in urgent
queue get higher priority. Then comes web
and then non_web
and at last background
queue. But the tasks queued in non-web
is always executed first. Here is the code for jobs.
Jobs:
from time import sleep
@celery.task(queue='web', priority=8)
def high_priority_job():
print 'High priority job'
for i in range(1, 31):
print i
sleep(1)
@celery.task(queue='non_web', priority=6)
def medium_priority_job():
print 'Medium priority job'
for i in range(1, 31):
print i
sleep(1)
I tried queuing jobs in a queue like this:
def queue_jobs():
for i in range(50):
medium_priority_job.delay()
for i in range(20):
high_priority_job.delay()
In this the high priority jobs are being executed only after all the medium priority jobs are completed. I have also tried removing priority from jobs declaration and set it while queuing the job like this:
def queue_jobs():
for i in range(50):
medium_priority_job.apply_async(priority=6)
for i in range(20):
high_priority_job.apply_async(priority=8)
This didn't work either. I tried upgrading Celery to the latest version. That didn't help either. What is wrong here? Do I need to change any settings?