31
votes

I want to create a group from a list returned by a Celery task, so that for each item in the task result set, one task will be added to the group.

Here's a simple code example to explain the use case. The ??? should be the result from the previous task.

@celery.task
def get_list(amount):
    # In reality, fetch a list of items from a db
    return [i for i in range(amount)]

@celery.task
def process_item(item):
    #do stuff
    pass

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???))

I'm probably not approaching this correctly, but I'm pretty sure it's not safe to call tasks from within tasks:

@celery.task
def process_list():
    for i in get_list.delay().get():
        process_item.delay(i)

I don't need the result from the seconds task.

1
Indeed, do not call a task from a task. This will cause deadlocks. Say you have one worker. You call your task, which ties up worker 1, then calls a second task. There's no worker to process that task and everything will hang. This nastiness gets slightly better as you add workers, but you'll always be tying up multiple workers with a single task (and losing parallelism). - mlissner

1 Answers

45
votes

You can get this kind of behavior using an intermediate task. Here's a demonstration of creating a "map" like method that works like you've suggested.

from celery import task, subtask, group

@task
def get_list(amount):
    return [i for i in range(amount)]

@task
def process_item(item):
    # do stuff
    pass

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s()))

Credit to Ask Solem for giving me this suggestion when I asked him for help on a similar issue.