In continuity with the question: celery periodic tasks not executing
I have set up django with celery like:
drf_project/drf_project/celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'drf_project.settings')
app = Celery('drf_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
In drf_project/drf_project/init.py
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
drf_project/user_management/tasks.py
from drf_project.celery import app
from time import strftime
@app.task
def print_test():
print strftime('%Y-%m-%d %H:%M:%S')
with open('abc.txt', 'ab+') as test_file:
test_file.writeline(strftime('%Y-%m-%d %H:%M:%S'))
In drf_project/drf_project/settings.py
INSTALLED_APPS += ('django_celery_beat',)
CELERYBEAT_SCHEDULE = {
"test_1": {
"task": "tasks.print_test",
"schedule": timedelta(seconds=2),
},
}
I ran both celery worker and beat in different terminals with commands:
celery -A drf_project worker -l info -E
and
celery -A drf_project beat -l info -S django
The beat is sending tasks to worker every 2 seconds like:
[2016-11-28 12:25:19,314: INFO/MainProcess] Scheduler: Sending due task Importing contacts (print_test)
But the worker throws the error like:
[2016-11-28 12:24:57,551: ERROR/MainProcess] Received unregistered task of type 'print_test'. The message has been ignored and discarded.
Did you remember to import the module containing this task? Or maybe you're using relative imports?
Please see http://docs.celeryq.org/en/latest/internals/protocol.html for more information.
The full contents of the message body was: u'[[], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (77b) Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received strategy = strategies[type_] KeyError: 'print_test'
I can see the task in my worker app like:
[tasks]
. user_management.tasks.print_test
How to solve this error?