I'm writing a chain which transforms some JSON into a URI based on its contents and then POSTs data to that URI. I'd like to do this asynchronously with Celery and know that grouping the chains will allow me to do this easily.
I have the following tasks written:
import time
from celery import group, chain
from celery.utils.log import get_task_logger
from app import celery
logger = get_task_logger(__name__)
@celery.task
def create_uri(obj_json, endpoint):
uri = "{0}:{1}/{2}".format(
obj_json["host"],
obj_json["port"],
endpoint
)
logger.debug("Created host {0} from {1}".format(uri, obj_json))
return uri
@celery.task
def send_post(uri, data):
logger.debug("Posting {0} to {1}...".format(data, uri))
return uri
def send_messages(objs, endpoint, data):
chains = [
# The next line is causing problems.
(create_uri.s(obj, endpoint) | send_post.s(data))
for obj in objs
]
g = group(*chains)
res = g.apply_async(queue="default")
while not res.ready():
time.sleep(1)
uris = res.get()
print("Posted to {0}".format(uris))
return uris
I find, however, when I try to use this, the create_uri
bit of the chains finishes, but send_post
is never called in my chain. This is strange, because I'm following the docs bit about chains and, indeed, am nearly following the example shown here about avoiding synchronous jobs.
I'm running my workers with
celery worker -A celery_worker.celery -l debug -c 5 -Q default
where celery_worker
simply pushes the app context and imports app.celery
.
and my config looks like this:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
A line from my logs is:
[2017-01-09 12:35:59,298: DEBUG/MainProcess] TaskPool: Apply (args:('app.tasks.create_uri', '197f4836-1cd8-4f7f-adaf-b8cebdb304ef', {'timelimit': [None, None], 'group': None, 'parent_id': None, 'retries': 0, 'argsrepr': "({'port': 8079, 'host': 'localhost'}, 'start')", 'lang': 'py', 'eta': None, 'expires': None, 'delivery_info': {'routing_key': 'default', 'priority': 0, 'redelivered': None, 'exchange': ''}, 'kwargsrepr': '{}', 'task': 'app.tasks.create_uri', 'root_id': '197f4836-1cd8-4f7f-adaf-b8cebdb304ef', 'correlation_id': '197f4836-1cd8-4f7f-adaf-b8cebdb304ef', 'origin': 'foobar', 'reply_to': '6559d43e-6cae-3b6f-89be-7b80e2a43098', 'id': '197f4836-1cd8-4f7f-adaf-b8cebdb304ef'}, b'[[{"port": 8079, "host": "localhost"}, "start"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": [{"task": "app.tasks.send_post", "subtask_type": null, "options": {"group_id": "60c2c9b2-eb51-457d-b248-b8e5552e0fd8", "task_id":... kwargs:{})
When I print chains[0].tasks
, I see this:
(app.tasks.create_uri({'host': 'localhost', 'port': 8079}, 'start'),
app.tasks.send_post({'hello': 'world'}))
It is recognizing that send_post
is the next task in the chain, but the task is never being accepted.
Why is my group
hanging after finishing the first tasks in the chain?