3
votes

In continuity with the question: celery periodic tasks not executing

I have set up django with celery like:

drf_project/drf_project/celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'drf_project.settings')
app = Celery('drf_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

In drf_project/drf_project/init.py

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']

drf_project/user_management/tasks.py

from drf_project.celery import app
from time import strftime

@app.task
def print_test():
    print strftime('%Y-%m-%d %H:%M:%S')
    with open('abc.txt', 'ab+') as test_file:
        test_file.writeline(strftime('%Y-%m-%d %H:%M:%S'))

In drf_project/drf_project/settings.py

INSTALLED_APPS += ('django_celery_beat',)
CELERYBEAT_SCHEDULE = {
    "test_1": {
        "task": "tasks.print_test",
        "schedule": timedelta(seconds=2),
    },
}

I ran both celery worker and beat in different terminals with commands:

celery -A drf_project worker -l info -E

and

celery -A drf_project beat -l info -S django

The beat is sending tasks to worker every 2 seconds like:

[2016-11-28 12:25:19,314: INFO/MainProcess] Scheduler: Sending due task Importing contacts (print_test)

But the worker throws the error like:

[2016-11-28 12:24:57,551: ERROR/MainProcess] Received unregistered task of type 'print_test'. The message has been ignored and discarded.

Did you remember to import the module containing this task? Or maybe you're using relative imports?

Please see http://docs.celeryq.org/en/latest/internals/protocol.html for more information.

The full contents of the message body was: u'[[], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (77b) Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received strategy = strategies[type_] KeyError: 'print_test'

I can see the task in my worker app like:

[tasks]
  . user_management.tasks.print_test

How to solve this error?

1

1 Answers

1
votes

Try setting in the CELERYBEAT_SCHEDULE the absolute path of the task, "user_management.tasks.print_test"

It should look like this:

INSTALLED_APPS += ('django_celery_beat',)
CELERYBEAT_SCHEDULE = {
    "test_1": {
        "task": "user_management.tasks.print_test",
        "schedule": timedelta(seconds=2),
    },
}