4
votes

I'm creating a background celery task with Flask and Flask-SQLAlchemy to update a property of a database record. I am using the recommended documentation for the celery config and this is a simplified version of my code:

from flask import Flask
from celery import Celery

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery


app = Flask(__name__)

celery = make_celery(app)

class Stuff(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    processed = db.Column(db.Boolean)


@celery.task()
def process_stuff(stuff_id):
    stuff = Stuff.query.get(stuff_id)
    print("stuff.processed 1: {}".format(stuff.processed))
    stuff.processed = True
    print("stuff.processed 2: {}".format(stuff.processed))
    db.session.add(stuff)
    db.session.commit()
    print("stuff.processed 3: {}".format(stuff.processed))

@app.route("/process_stuff/<id>")
def do_process_stuff(id):
    stuff = Stuff.query.get_or_404(id)
    process_stuff.delay(stuff.id)
    return redirect(url_for("now_wait"))

And this is the output from the print statements:

[2017-07-11 07:32:01,281: WARNING/PoolWorker-4] stuff.processed 1: False
[2017-07-11 07:32:01,282: WARNING/PoolWorker-4] stuff.processed 2: False
[2017-07-11 07:32:01,285: WARNING/PoolWorker-4] stuff.processed 3: False

I can see in my celery worker logs that the task is being picked up and completed; however, the print statements show that the stuff.processed property is ALWAYS False - the print statement never shows True even after I update it manually (I've tested this outside of celery and I can update the property just fine).

There is a very similar problem here but that solution did not work for my situation

Library Versions

  • Flask 0.12.2
  • SQLAlchemy 1.1.11
  • Celery 4.0.2
  • Flask-SQLAlchemy 2.2

Update

Additional testing shows I can create and persist newly created objects to the database - updating pre-existing objects continues to fail.

1
try to add method def on_success(self, retval, task_id, args, kwargs): db.session.commit() to ContextTaskDanila Ganchar
@DanilaGanchar no luck unfortunatelyAnconia
could you add flask, celery, sqlalchemy versions to the question? by the way I can't to repeat your behavior.Danila Ganchar
@DanilaGanchar No problem, I've included the libraries. I'm also using postgresql as the databaseAnconia
If you're using Flask-SQLAlchemy you need to create a request context for the database calls in your celery task. Though I'm not sure why you aren't getting an error that would state that (unless something else is hiding the exception)Cfreak

1 Answers

1
votes

In my question, I simplified the code substantially and when doing so I changed the reserved property to processed. My actual model looks more like this:

class Stuff(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    reserved = db.Column(db.Boolean)

It turns out that reserved is a function in celery, which is why the property could not be updated from my celery task. The solution was to rename the reserved property on the model.