0
votes

I would like to do cron jobs from Flask using Celery but I have an issue regarding celery beat schedule, because it seems that my task is not loaded and I don't know how to check where the issue is.

This is where I define my Flask app in views.py :

from celery.schedules import crontab

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379',
    CELERY_BEAT_SCHEDULE={
        'task-number-one': {
            'task': 'app.tasks.test',
            'schedule': crontab(minute="*"),
        }
    },
    CELERY_IMPORTS = ('app.tasks'),
    CELERY_TASK_RESULT_EXPIRES = 30,
    CELERY_TIMEZONE = 'UTC',
)

and this where my celery object is created in tasks.py:

from celery import Celery
from app.views import app
from celery import shared_task


def make_celery(app):
    celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

celery_app = make_celery(app)

@celery_app.task()
def test():
    logger = test.get_logger()
    logger.info("Hello")

views.py and tasks.py are under the same directories which is called app

This is what I have when launching celery worker (everything seems normal here):

But this is what I have when launching celery beat, it seems that my task is never sent by my schedule but I don't know where to check: enter image description here

Can you help me on this? Best

1

1 Answers

0
votes

I believe Celery-Beat Tasks need to be configured at least after the @app.on_after_configure.connect signal is sent. You should be able to do the following in your tasks.py file.

celery_app.conf.CELERYBEAT_SCHEDULE = {
    "test-every-minue": {
    "task": "tasks.test",
    "schedule": crontab(minute="*"),
     },
}

Alternatively you can use this decorator syntax if your task is defined in the same file as your celery application instance.

@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(5 , test_two.s(), name='test-every-5')

If your tasks are defined in a separate module you can use the @app.on_after_finalize.connect decorator after importing your tasks.

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    from app.tasks import test
    sender.add_periodic_task(10.0, test.s(), name='test-every-10')

Celery Beat Entry Docs