I'm struggling to make my Flask, SQLAlchemy (mysql) and Celery setup work properly when there are multiple celery workers with multiple threads involved that all query the same database.
The problem is that I cannot figure out how and where to apply required changes that give the flask application and each celery worker an isolated database object.
From my understanding, separate sessions are required to avoid nasty database errors such as incomplete transactions that block other database queries.
This is my current project structure
/flask_celery.py
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
/app.py
#!/usr/bin/env python
import config
from app import app
app.run(port=82,debug=True, host='0.0.0.0')
#app.run(debug=True)
app/__init.py__
from flask import Flask
from celery import Celery
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_celery import make_celery
app = Flask(__name__)
app.config.from_object('config')
app.secret_key = app.config['SECRET_SESSION_KEY']
db = SQLAlchemy(app)
migrate = Migrate(app, db)
celery = make_celery(app)