4
votes

I'm trying to wrap my head around this asynchronous task processing setup. So far I've been looking at using Celery, but am not locked on to anything yet. Only requirement is I can use Redis as broker and distribute the tasks on multiple nodes.

       ->  Task2  ->  Task3
Task1  ->  Task2  ->  Task3    [then]    Task4
       ->  Task2  ->  Task3

Explanation:

  • Task1 produces a list of items
  • Task2 receives one item from Task1 as argument
  • Task2 and Task3 are chained, and each of these chains are executed in parallel
  • Task4 is executed when all of the Task2-Task3 chains have completed (does not need to have any data passed from Task3)

Question is then, how can i do this with Celery?

1

1 Answers

2
votes

It can be done using chord and chain functions, please take a look of example. It should fit your needs.

from celery import Celery, chord, chain

backend = 'redis://redis:6379/'
app = Celery(result_backend=backend, backend=backend)


@app.task
def task1():
    argument = 123
    return chord([
        chain(task2.s(argument), task3.s()),
        chain(task2.s(argument), task3.s()),
        chain(task2.s(argument), task3.s()),
    ])(task4.s())


@app.task
def task2(argument):
    pass


@app.task
def task3(result_task2):
    pass


@app.task
def task4(result):
    pass


task1.apply_async()