8
votes

Using Celery ver.3.1.23, I am trying to dynamically add a scheduled task to celery beat. I have one celery worker and one celery beat instance running.

Triggering a standard celery task y running task.delay() works ok. When I define a scheduled periodic task as a setting in configuration, celery beat runs it.

However what I need is to be able to add a task that runs at specified crontab at runtime. After adding a task to persistent scheduler, celery beat doesn't seem to detect the newly added new task. I can see that the celery-schedule file does have an entry with new task.

Code:

scheduler = PersistentScheduler(app=current_app, schedule_filename='celerybeat-schedule')
scheduler.add(name="adder",
          task="app.tasks.add",
          schedule=crontab(minute='*/1'),
          args=(1,2))
scheduler.close()

When I run:

print(scheduler.schedule)

I get:

{'celery.backend_cleanup': <Entry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>,
'adder': <Entry: adder app.tasks.add(1, 2) <crontab: */1 * * * * (m/h/d/dM/MY)>}

Note that app.tasks.add has the @celery.task decorator.

4

4 Answers

0
votes

You may solve your problem by enabling autoreloading.

However I'm not 100% sure it will work for your config file but it should if is in the CELERY_IMPORTS paths.

Hoverer note that this feature is experimental and to don't be used in production.

If you really want to have dynamic celerybeat scheduling you can always use another scheduler like the django-celery one to manage periodic tasks on db via a django admin.

0
votes

I'm having a similar problem and a solution I thought about is to pre-define some generic periodic tasks (every 1s, every 5mins, etc) and then have them getting, from DB, a list of function to be executed. Every time you want to add a new task you just add an entry in your DB.

0
votes

Celery beat stores all the periodically scheduled tasks in the model PeriodicTask . As a beat task can be scheduled in different ways including crontab, interval or solar. All these fields are a foreign key in the PeriodicTask model.

In order to dynamically add a scheduled task, just populate the relevant models in celery beat, the scheduler will detect changes. The changes are detected when either the count of tuple changes or save() function is called.

from django_celery_beat.models import PeriodicTask, CrontabSchedule

# -- Inside the function you want to add task dynamically 

schedule = CrontabSchedule.objects.create(minute='*/1')
task = PeriodicTask.objects.create(name='adder',
                                   task='apps.task.add', crontab=schedule)
task.save()
0
votes

Instead of trying to find a good workaround, I suggest you switch to the Celery Redbeat.