2
votes

I am sending a chain of three tasks to my celery workers. The first and third are added to the queue "filestore", which is served by worker A. The second is added to the queue "cloud" which is served by worker B.

The behaviour I want is for the three tasks to execute in order, one after the other.

The behavour I am seeing is that worker A does task 1, then task 3, then worker B does task 2.

result = app.send_task(
                        "workerTasks_filestore.task_upload_scan_to_s3", args=[scan.scan_name], queue='filestore',
                        chain=[
                            Signature('workerTasks.do_processing_task', args=[scan.scan_name, spd_name], queue=queue, immutable=True),
                            Signature('workerTasks_filestore.task_download_scan_from_s3', args=[scan.scan_name], queue='filestore', immutable=True),
                        ]
)

What am I doing wrong?

1
How do you measure the execution times? - fafl
I'm watching the two processes. A clearly executes 1 and 3 while B is idle, then B takes up 2. - Omroth
Sigh if you can't chain across queues that's going to be a real pain! - Omroth
Does worker A use the queue to send and receive a message between task 1 and 3? - fafl
I'm afraid I don't know what exactly you mean there - I don't do anything on purpose to "communicate" between tasks. - Omroth

1 Answers

1
votes

Did you try using the chain class from celery?

from celery import chain, Signature

chained_tasks = chain([
    Signature('workerTasks_filestore.task_upload_scan_to_s3', args=(scan.scan_name,), queue='filestore'),
    Signature('workerTasks.do_processing_task', args=(scan.scan_name, spd_name,), queue=queue, immutable=True),
    Signature('workerTasks_filestore.task_download_scan_from_s3', args=(scan.scan_name,), queue='filestore', immutable=True)
])

result = chained_tasks.apply_async()