0
votes

So the project I am working on requires a distributed tasks system to process CPU intensive tasks. This is relatively straight forward, spin up celery and throw all the tasks in a queue and have celery do the rest.

The issue I have is that every user needs their own queue, and items within each users queue must be processed synchronously. So it there is a task in a users queue already processing, wait until it is finished before allowing a worker to pick up the next.

The closest I've come to something like this is having a fixed set of queues, and assigning them to users. Then having the users tasks picked off by celery workers fixed to a certain queue with a concurrency of 1.

The problem with this system is that I can't scale my workers to process a backlog of user tasks.

Is there a way I can configure celery to do what I want, or perhaps another task system exists that does what I want?

Edit:

Currently I use the following command to spawn my celery workers with a concurrency of one on a fixed set of queues

celery multi start 4 -A app.celery -Q:1 queue_1 -Q:2 queue_2 -Q:3 queue_3 -Q:4 queue_4 --logfile=celery.log --concurrency=1

I then store a queue name on the user object, and when the user starts a process I queue a task to the queue stored on the user object. This gives me my synchronous tasks.

The downside is when I have multiple users sharing queues causing tasks to build up and never getting processed.

I'd like to have say 5 workers, and a queue per user object. Then have the workers just hop over the queues, but never have more than 1 worker on a single queue at a time.

1
Do you have any code from you attempts you can share?Lucas Hendren
There isn't much to it, but I've added some more information to the original question.James Foley

1 Answers

1
votes

I use chain doc here condition for execution task in a specific order :

chain = task1_task.si(account_pk) | task2_task.si(account_pk) | task3_task.si(account_pk)
chain()

So, i execute for a specific user task1 when its finished i execute task2 and when finished execute task3. It will spawm in any worker available :)

For stopping a chain midway:

self.request.callbacks = None
return

And don't forget to bind your task :

@app.task(bind=True)
def task2_task(self, account_pk):