10
votes

I want to implement task priority in my celery workers. I can do this by creating different queues for high priority tasks and low priority tasks. But also I need to send broadcast tasks to all workers with a broadcast queue and its not working. Here is tasks.py file:

from celery import Celery
from kombu.common import Broadcast, Queue, Exchange

app = Celery('tasks')

app.conf.update(
    CELERY_RESULT_BACKEND='amqp',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    BROKER_URL='amqp://',
    CELERY_QUEUES=(Queue('default',
                         Exchange('default'),
                         routing_key='default'),
                   Queue('low_priority',
                         Exchange('low_priority'),
                         routing_key='low_priority'),
                   Broadcast('broadcast_tasks'), ),
    CELERY_ROUTES={'tasks.broadcast':
                   {'queue': 'broadcast_tasks'},
                   'tasks.low_task':
                   {'queue': 'low_priority'},
                   },
    CELERY_DEFAULT_QUEUE = 'default',
    CELERY_DEFAULT_EXCHANGE = 'default',
    CELERY_DEFAULT_ROUTING_KEY = 'default'
)

@app.task
def broadcast():
    print "Broadcast called"

@app.task
def low_task():
    print "Low priority called"

@app.task
def def_task():
    print "Default called"

When I run celery workers with this command:

celery -A tasks -Q default worker --loglevel=info
celery -A tasks -Q default,low_priority worker --loglevel=info

Task priority works but broadcast tasks are not acknowledged.

When I run the same command without a queue argument, broadcast works but task priority does not:

celery -A tasks worker --loglevel=info
celery -A tasks worker --loglevel=info

As I understand, it happens because broadcast queues have unique names, like bcast.0b5dbce0-9bcb-48a5-8554-cbb7f32a6703 for each worker.

Does anyone have a good workaround? Thanks in advance!

1
try adding 'broadcast_task' celery -A tasks -Q default,low_priority,broadcast_tasks worker --loglevel=info I am not sure but this should work - Vardhman Patil

1 Answers

0
votes

You must explicitly consume a broadcast queue, so modify your commandline invocation as the comment above per ANDY_VAR.

A similar question was asked here: