1
votes

Cheers,

I have a celery setup running in a production environment (on Linux) where I need to consume two different task types from two dedicated queues (one for each). The problem that arises is, that all workers are always bound to both queues, even when I specify them to only consume from one of them.

TL;DR

  • Celery running with 2 queues
  • Messages are published in correct queue as designed
  • Workers keep consuming both queues
  • Leads to deadlock

General Information

Think of my two different task types as a hierarchical setup:

  • A task is a regular celery task that may take quite some time, because it dynamically dispatches other celery tasks and may be required to chain through their respective results

  • A node is a dynamically dispatched sub-task, which also is a regular celery task but itself can be considered an atomic unit.

My task thus can be a more complex setup of nodes where the results of one or more nodes serves as input for one or more subsequent nodes, and so on. Since my tasks can take longer and will only finish when all their nodes have been deployed, it is essential that they are handled by dedicated workers to keep a sufficient number of workers free to consume the nodes. Otherwise, this could lead to the system being stuck, when a lot of tasks are dispatched, each consumed by another worker, and their respective nodes are only queued but will never be consumed, because all workers are blocked.

If this is a bad design in general, please make any propositions on how I can improve it. I did not yet manage to build one of these processes using celery's built-in canvas primitives. Help me, if you can?!

Configuration/Setup

I run celery with amqp and have set up the following queues and routes in the celery configuration:

CELERY_QUERUES = (
    Queue('prod_nodes', Exchange('prod'), routing_key='prod.node'),
    Queue('prod_tasks', Exchange('prod'), routing_key='prod.task')
)

CELERY_ROUTES = (
    'deploy_node': {'queue': 'prod_nodes', 'routing_key': 'prod.node'},
    'deploy_task': {'queue': 'prod_tasks', 'routing_key': 'prod.task'}
)

When I launch my workers, I issue a call similar to the following:

celery multi start w_task_01 w_node_01 w_node_02 -A my.deployment.system \
  -E -l INFO -P gevent -Q:1 prod_tasks -Q:2-3 prod_nodes -c 4 --autoreload \
  --logfile=/my/path/to/log/%N.log --pidfile=/my/path/to/pid/%N.pid

The Problem

My queue and routing setup seems to work properly, as I can see messages being correctly queued in the RabbitMQ Management web UI.

However, all workers always consume celery tasks from both queues. I can see this when I start and open up the flower web UI and inspect one of the deployed tasks, where e.g. w_node_01 starts consuming messages from the prod_tasks queue, even though it shouldn't.

The RabbitMQ Management web UI furthermore tells me, that all started workers are set up as consumers for both queues.

Thus, I ask you...

... what did I do wrong?

Where is the issue with my setup or worker start call; How can I circumvent the problem of workers always consuming from both queues; Do I really have to make additional settings during runtime (what I certainly do not want)?

Thanks for your time and answers!

2
Have you tried not using multi and just create 2 worker daemons? - woot
Not yet, I'll see whether it helps. Thanks for the hint. - jbndlr

2 Answers

1
votes

You can create 2 separate workers for each queue and each one's define what queue it should get tasks from using the -Q command line argument.

If you want to keep the number processes the same, by default a process is opened for each core for each worker you can use the --concurrency flag (See Celery docs for more info)

1
votes

Celery allows configuring a worker with a specific queue.

1) Specify the name of the queue with 'queue' attribute for different types of jobs

celery.send_task('job_type1', args=[], kwargs={}, queue='queue_name_1')
celery.send_task('job_type2', args=[], kwargs={}, queue='queue_name_2')

2) Add the following entry in configuration file

CELERY_CREATE_MISSING_QUEUES = True

3) On starting the worker, pass -Q 'queue_name' as argument, for consuming from that desired queue.

celery -A proj worker -l info -Q queue_name_1 -n worker1
celery -A proj worker -l info -Q queue_name_2 -n worker2