2
votes

Preconditions: There is a small celery cluster processing some tasks. Each celery instance has few workers running. Everything is running under flask.

Tasks: I need an ability to pause/resume consuming of tasks from a particular node from the code. I.e. task can make a decision if current celery instance and all her workers should pause or resume consuming of tasks.

Didn't find any straight forward way to solve this. Any suggestions?

Thanks in advance!

2

2 Answers

0
votes

Control.cancel_consumer(queue, **kwargs) (reference) is all that you probably need for your use case.

-1
votes

Perhaps a better strategy would be to divide the work across several queues.

Have a default queue where all tasks start. The workers watching the default queue can, according to your logic, add subtasks to the other active queues. You may not need this extra queue if you can add tasks to the active queues directly from flask.

That way, each node does not have to worry about whether it's paused or active. It just consumes everything that's been added to its queue. These location-specific queues will be empty (and thus paused) unless the default workers have added subtasks.