1
votes

I'm writing a chain which transforms some JSON into a URI based on its contents and then POSTs data to that URI. I'd like to do this asynchronously with Celery and know that grouping the chains will allow me to do this easily.

I have the following tasks written:

import time    

from celery import group, chain
from celery.utils.log import get_task_logger

from app import celery

logger = get_task_logger(__name__)

@celery.task
def create_uri(obj_json, endpoint):
    uri = "{0}:{1}/{2}".format(
        obj_json["host"],
        obj_json["port"],
        endpoint
    )
    logger.debug("Created host {0} from {1}".format(uri, obj_json))
    return uri

@celery.task
def send_post(uri, data):
    logger.debug("Posting {0} to {1}...".format(data, uri))
    return uri

def send_messages(objs, endpoint, data):
    chains = [
         # The next line is causing problems.
        (create_uri.s(obj, endpoint) | send_post.s(data))
        for obj in objs
    ]
    g = group(*chains)
    res = g.apply_async(queue="default")
    while not res.ready():
       time.sleep(1)
    uris = res.get()
    print("Posted to {0}".format(uris))
    return uris

I find, however, when I try to use this, the create_uri bit of the chains finishes, but send_post is never called in my chain. This is strange, because I'm following the docs bit about chains and, indeed, am nearly following the example shown here about avoiding synchronous jobs.

I'm running my workers with

celery worker -A celery_worker.celery -l debug -c 5 -Q default

where celery_worker simply pushes the app context and imports app.celery.

and my config looks like this:

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

A line from my logs is:

[2017-01-09 12:35:59,298: DEBUG/MainProcess] TaskPool: Apply (args:('app.tasks.create_uri', '197f4836-1cd8-4f7f-adaf-b8cebdb304ef', {'timelimit': [None, None], 'group': None, 'parent_id': None, 'retries': 0, 'argsrepr': "({'port': 8079, 'host': 'localhost'}, 'start')", 'lang': 'py', 'eta': None, 'expires': None, 'delivery_info': {'routing_key': 'default', 'priority': 0, 'redelivered': None, 'exchange': ''}, 'kwargsrepr': '{}', 'task': 'app.tasks.create_uri', 'root_id': '197f4836-1cd8-4f7f-adaf-b8cebdb304ef', 'correlation_id': '197f4836-1cd8-4f7f-adaf-b8cebdb304ef', 'origin': 'foobar', 'reply_to': '6559d43e-6cae-3b6f-89be-7b80e2a43098', 'id': '197f4836-1cd8-4f7f-adaf-b8cebdb304ef'}, b'[[{"port": 8079, "host": "localhost"}, "start"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": [{"task": "app.tasks.send_post", "subtask_type": null, "options": {"group_id": "60c2c9b2-eb51-457d-b248-b8e5552e0fd8", "task_id":... kwargs:{})

When I print chains[0].tasks, I see this:

(app.tasks.create_uri({'host': 'localhost', 'port': 8079}, 'start'), 
 app.tasks.send_post({'hello': 'world'}))

It is recognizing that send_post is the next task in the chain, but the task is never being accepted.

Why is my group hanging after finishing the first tasks in the chain?

1

1 Answers

1
votes

You are creating chains and groups correctly. However tasks send to invalid queues won't be recognised by workers. When you do .get() on them, they hang forever as it will never return result.

So, you can use default celery queue

res = g.apply_async().get()

# explicit
res = g.apply_async(queue="celery").get()

Or configure routing properly and then use custom queue.

res = g.apply_async(queue='foo').get()