I'm creating a background celery task with Flask and Flask-SQLAlchemy to update a property of a database record. I am using the recommended documentation for the celery config and this is a simplified version of my code:
from flask import Flask
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
celery = make_celery(app)
class Stuff(db.Model):
id = db.Column(db.Integer, primary_key=True)
processed = db.Column(db.Boolean)
@celery.task()
def process_stuff(stuff_id):
stuff = Stuff.query.get(stuff_id)
print("stuff.processed 1: {}".format(stuff.processed))
stuff.processed = True
print("stuff.processed 2: {}".format(stuff.processed))
db.session.add(stuff)
db.session.commit()
print("stuff.processed 3: {}".format(stuff.processed))
@app.route("/process_stuff/<id>")
def do_process_stuff(id):
stuff = Stuff.query.get_or_404(id)
process_stuff.delay(stuff.id)
return redirect(url_for("now_wait"))
And this is the output from the print statements:
[2017-07-11 07:32:01,281: WARNING/PoolWorker-4] stuff.processed 1: False
[2017-07-11 07:32:01,282: WARNING/PoolWorker-4] stuff.processed 2: False
[2017-07-11 07:32:01,285: WARNING/PoolWorker-4] stuff.processed 3: False
I can see in my celery worker logs that the task is being picked up and completed; however, the print statements show that the stuff.processed
property is ALWAYS False - the print statement never shows True even after I update it manually (I've tested this outside of celery and I can update the property just fine).
There is a very similar problem here but that solution did not work for my situation
Library Versions
- Flask 0.12.2
- SQLAlchemy 1.1.11
- Celery 4.0.2
- Flask-SQLAlchemy 2.2
Update
Additional testing shows I can create and persist newly created objects to the database - updating pre-existing objects continues to fail.
def on_success(self, retval, task_id, args, kwargs): db.session.commit()
toContextTask
– Danila Ganchar