0
votes

I can't send tasks to celery when trying to create two separate dedicated workers. I have gone through the docs and this question, but it did not improve my situation.

My configuration is the following:

CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
    'books.tasks.resize_book_photo': {
        'queue': 'media',
        'routing_key': 'media',
    },
}

Tasks are defined in the following way in the tasks.py file:

import logging
import time

from celery import shared_task


from books.models import Author, Book
from books.commands import resize_book_photo as resize_book_photo_command


logger = logging.getLogger(__name__)


@shared_task
def list_test_books_per_author():
    time.sleep(5)
    queryset = Author.objects.all()
    for author in queryset:
        for book in author.testing_books:
            logger.info(book.title)


@shared_task
def resize_book_photo(book_id: int):
    resize_book_photo_command(Book.objects.get(id=book_id))

And they are called using apply_async:

list_test_books_per_author.apply_async()
resize_book_photo.apply_async((book.id,))

When I run celery flower I see that no tasks appear in queues. Celery flower panel showing two active workers without tasks.

The workers are started using:

celery -A blacksheep worker -l info --autoscale=10,1 -Q media --host=media@%h
celery -A blacksheep worker -l info --autoscale=10,1 -Q default --host=default@%h

What I can do is by using redis-cli and 127.0.0.1:6379> LRANGE celery 1 100 command confirm that they end up under celery key (which is the default one for celery). No workers seems to consume.

EDIT After taking a closer look at this part of documentation I noticed that my naming was wrong. After changing settings to:

CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = f'redis://{env("REDIS_HOST")}:{env("REDIS_PORT")}/{env("REDIS_CELERY_DB")}'
CELERY_TASK_DEFAULT_QUEUE = 'default'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('media', Exchange('media'), routing_key='media'),
)
CELERY_ROUTES = {
    'books.tasks.resize_book_photo': {
        'queue': 'media',
        'routing_key': 'media',
    },
}

Situation improved: tasks are consumed from default queue, but the task I want to go to media queue also goes to the default.

EDIT2 I have tried to explicitly tell certain task to go to other queue by changing its call to resize_book_photo.apply_async((book.id,), queue='media'). The task has been correctly dispatched to proper queue and consumed. I would, however, prefer this to be handled automatically so that I don't have to define queue whenever I call apply_async

1
Have you tried change this celery -A blacksheep worker -l info --autoscale=10,1 -Q default --host=media@%h for this celery -A blacksheep worker -l info --autoscale=10,1 -Q media --host=media@%h ? The param -Q name - Paulo Henrique
Yes. And this isn't going to help since the problem is with routing tasks to proper queue from backend and not with consuming them once they have been saved by broker. - gonczor

1 Answers

2
votes

Try CELERY_TASK_ROUTES instead of CELERY_ROUTES. This worked for me recently with django integration.

explanation is buried in this comment: How to route tasks to different queues with Celery and Django