1
votes

I am running my workers with the following command:

celery -A myapp multi start 4 -l debug -Q1:3 queue1,queue2 -Q:4 queue3

The workers start out very well so when i run

celery inspect active_queues

the queues appear assigned.

Then i start tasks from my django app with the following code:

result = chain(task1.s(**kwargs).set(queue='queue1'),task2.s(**kwargs).set(queue='queue2'))()

i parse the result variable with result.parent to get all tasks IDs and record them to database for further inspection. When i issue

task = AsyncResult(task.id)
task.status

i get 'PENDING' for every task i start with my chain. The celery logs doesn't seem to be receiving any tasks. However when i issue a

celery purge

command with a following

yes

i get message that my tasks has been actually removed from 1 queue the AsyncResult.status on the deleted tasks from here on continue to show up as 'PENDING' and the tasks never start. I use rabbitmq-server as a broker with all default configuration. My celery config is default. It is really strange but in another environment the same code and commands produce other results: The workers also start but they do receive the very same tasks and execute them without any issues. Please consider what might be an issue here.

p.s. when i start a worker the other way:

celery -A myapp worker -Q queue1,queue2,queue3 -l debug

i still cant get my tasks executing. The problem started to show up when i modified my chain to launch tasks and added the

.set(queue='queue1')

or queue2 or queue3

p.p.s:

all my tasks are written with a

@shared_task

decorator

Is there at least a way to see which tasks (which i can remove by celery purge) are waiting on a queue and what is the queue name they are waiting for?

1

1 Answers

1
votes

Celery default settings should cover your case, so only thing could be that you have defined some of the following option in a way that mute your queues, and in this case consider commenting them out (more in the docs):

  • CELERY_QUEUES

  • CELERY_ROUTES

  • CELERY_DEFAULT_EXCHANGE

  • CELERY_DEFAULT_ROUTING_KEY

  • CELERY_DEFAULT_ROUTING_KEY

As for your question, I guess that's not the full answer, but you can list all active queues from RabbitMQ.

Using Celery, from the doc:

celery -A proj inspect active

Using RabbitMQ, from the doc:

rabbitmqadmin list queues vhost name node messages message_stats.publish_details.rate