13
votes

I follow the Celery Django tutorial and the tasks I see in the example (add, mul) work for me perfectly. I get the correct response when I do res = add.delay(1,2); res.get().

But I get *** NotRegistered: u'pipeline.tasks.sayhello' when I try to execute another my task res = sayhello.delay('trex').

If I do res = sayhello('trex') then I can get the result by just typing res. But in this way, I execute the function ornidarly, without using Celery.

The task works only if I run it in the Django shell ./manage shell

>>> res = sayhello.delay('trex')
>>> res.get()
u'Hello trex'

So, the problem is that I can't execute sayhello task from pipeline/views.py. But I can execute tasks add and mul from there.

Why is that? How to run tasks correctly from views.py?

The error full message:

[2016-11-11 10:56:09,870: ERROR/MainProcess] Received unregistered task of type u'pipeline.tasks.sayhello'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[["tiger"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b)
Traceback (most recent call last):
  File "/home/trex/Development/Sirius/new/rocket/rocket-venv/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received
    strategy = strategies[type_]
KeyError: u'pipeline.tasks.sayhello'

Django version

1.9.7

Celery version:

celery==4.0.0
django-celery==3.1.17

Django project dir tree:

rocket
├── etl
│   ├── etl
│   │   ├── celery.py
│   │   ├── __init__.py
│   │   ├── settings
│   │   │   ├── base.py
│   │   │   ├── dev.py
│   │   │   ├── __init__.py
│   │   │   ├── production.py
│   │   │   └── test.py
│   │   ├── urls.py
│   │   ├── wsgi.py
│   ├── manage.py
│   ├── pipeline
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── tasks.py
│   │   ├── tests.py
│   │   ├── urls.py
│   │   ├── views.py

etl/pipeline/views.py

from .tasks import *

def get_response(request):
    result = add.delay(1, 2)
    result.get()
    result = sayhello.delay('tiger')
    result.get()

etl/pipeline/tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def sayhello(name):
    return "Hello %s" % name

Also I tried this:

from celery.decorators import task

@task(name="sayhello")
def sayhello(name):
    return "Hello {0}".format(name)

etl/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'etl.settings.base')
app = Celery('etl')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

etl/__init__py

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']

etl/settings/base.py

...
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
CELERY_IMPORTS = ('pipeline.tasks', )
4

4 Answers

8
votes

This might hopefully help someone. I had modified my code and neglected to restart the celery worker.

Try restarting the celery workers.

6
votes

The error is because CELERY_IMPORTS setting is not working properly into your etl/settings/base.py file. So my suggestion is :

Remove the comma from

CELERY_IMPORTS = ('pipeline.tasks' , )

And if the issue still persists,then run this command :

celery -A pipeline.tasks worker --loglevel=DEBUG

One more thing, your tasks.py file needs to be in a Django app (that's registered in settings.py) in order to be imported.So check this point also.Thanks.

2
votes

Relative imports and automatic name generation don’t go well together, so if you’re using relative imports you should set the name explicitly.

For example if the client imports the module "myapp.tasks" as ".tasks", and the worker imports the module as "myapp.tasks", the generated names won’t match and an NotRegistered error will be raised by the worker.

http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-naming-relative-imports

1
votes

I had this issue recently. An import error was causing the task to not be registered correctly. It was failing silently, so I had no way knowing that the task registration failure was being caused by the import error. Hopefully this can help out someone who is stuck with tasks not registering in celery.