2
votes

I have a main celery (in Django) task, which calls a subtask prefetch:

@shared_task
def main():
    ...
    tasks.prefetch.s().delay(x)

Prefetch itself calls some subtasks:

@shared_task
def prefetch(x):
    ...
    do_prefetch.s().delay(x)

In main, later down after prefetch is called I execute a number of other subtasks that do some processing. Those subtasks need to execute after prefetch has completed. Prefetch itself takes some time, so I want to not only execute process tasks after prefetch, but with a 60 second delay. Process tasks don't need the output of prefetch, they just need to run after it has completed. And process tasks can themselves be run in parallel to each other.

@shared_task
def main():
    ...
    tasks.prefetch.s().delay(x)
    ...
    for i in range(10):
        tasks.process.s().delay(i)

I see that Celery's canvas commands should get me what I need, but I don't know how to set up the tasks. Is the below correct? Do chains execute simply by creating them or do they need to be executed explicitly? Is my countdown argument in the right place and mean the right thing (execute this task any time 60 seconds after prefetch)?

prefetch = tasks.prefetch.s(x)
g = group(tasks.process.s(i, countdown=60) for i in range(10))
c = (prefetch | g)
c()
1

1 Answers

2
votes

You can use chord instead of chain which fits best for your situation.

prefetch_task = tasks.prefetch.s(x)
group_task = group(tasks.process.s((i), countdown=60) for i in range(10))
work_flow = chord([prefetch_task])(group_task)   

Chord:

A chord consists of a header and a body. The header is a group of tasks that MUST COMPLETE before the callback is called. A chord is essentially a callback for a group of tasks.

Example:

>>> from celery import chord
>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())

Here it will process both add tasks first and upon completion it will execute sum_task.