6
votes

May ask you a question about Celery?

I have different writers that write a task every X minutes. Every task needs that the task from the same writer is completed. The system as is working well, as X minutes >> few seconds to do the tasks.

But, now, it may happen that the writers sends two or three tasks in the same time. Obviously, Celery + RabbitMQ will distribute this tasks to different workers, creating troubles.

I've searched, but I found responses about blocking with a lock a worker until the other finished (using for example Redis), but this is not possible, as I have less workers that writer.

I need something like N queues for N writers, and Celery capable to understand the order in each queue. I will have literally thousands of writers, so I can't create so many workers.

Example: A B C writers, A1, A2... tasks, and only one worker

I receive, in the "same" time A1,A2,B1,C1,B2,C2,A3,B3,C3

Celery should create the queue A (1-2-3) B (1-2-3) C (1-2-3)

And the sending the task A1, then, the next, it's not important if is A2,B1,C1, but it shouldn't be A3,B2,B3,C2,C3.

Hope I explained well

Thanks!

1
Does A2 knows if A1 is done? - ant31
No, absolutely. The writer can only add tasks, and they don't receive response - Alby87
First, your tasks can't be distributed as you describe it, then solution is to have one and only one worker per task group. But a worker can handle many queues. So for exemple you have 1000 tasks groups (one per writer), you create 1000 Queues (one dedicated per writer). Then you start 10 workers with each consuming from 100 different queues. - ant31
To let each worker consume from every queues, you have to either: break the relation between tasks group (A1/A2...), or formalize and store this relation. 'I'm A2 and I know that a task should be complete before me'. You can use db/cache to store task order, in practice the writer will have to queue the job and add at the same time in the cache "I'have queued Ax". - ant31

1 Answers

3
votes

I think you need to create one worker per queue to enforce the ordering like that. Otherwise the worker will just uses a first-in, first-out approach to handling tasks. You can create as many queues as you want and configure which of those queues each worker receives messages from. You can pass the -Q parameter when starting the worker to set its queues, as discussed in the Workers Guide.

celery -A my_project worker -l info -Q A

Then you can setup global mappings that define which queues each task goes to using the Routing Guide.

CELERY_ROUTES = {
    'my_app.tasks.task_a1': {'queue': 'A'},
    'my_app.tasks.task_a2': {'queue': 'A'},
    'my_app.tasks.task_b1': {'queue': 'B'},
    'my_app.tasks.task_c1': {'queue': 'C'},
}

Alternatively, you can specify the queue at the time you submit each task instance based on the Calling Tasks Guide.

task_a1.apply_async(queue='A')