2
votes

Im building a Flask app which require a background process resulting in uploads to a SQLAlchemy database.

Relevant snippets:

from flask_sqlalchemy import SQLAlchemy
import concurrent.futures
import queue
from models import Upload_Tracks

app = Flask(__name__)
db.init_app(app)

app.config.update(
SQLALCHEMY_DATABASE_URI= "sqlite:///%s" % os.path.join(app.root_path, 'path/to/player.sqlite3'))

q = queue.Queue()

In database.py:

from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

In models.py:

def Upload_Tracks(item):
        uri = None
        title = unidecode(item['title'])
        artist = unidecode(item['artist'])
        preview = item['preview']
        energy = item['energy']
        popularity = item['popularity']
        tempo = item['tempo']
        brightness = item['brightness']
        key = item['key']
        image = item['artist_image']
        duration = item['duration']
        loudness = item['loudness']
        valence = item['valence']
        genre = item['genre']

        track = Track(title=title,
                      artist=artist,
                      uri=uri,
                      track_id=None)

        db.session.add(track)

        track.preview = preview
        track.energy = energy
        track.popularity = popularity
        track.tempo = tempo
        track.genre = genre
        track.brightness = brightness
        track.key = key
        track.image = image
        track.duration = duration
        track.loudness = loudness
        track.valence = valence

        db.session.commit()

first function:

# 1st background process
def build_cache():
    """
    Build general cache 
    from user's streaming
    """
    tracks_and_features = spotify.query_tracks()

    for item in tracks_and_features:
        q.put(item)

    return "CACHE BUILT"               

second:

def upload_cache(track):
    # save to database
    Upload_Tracks(filtered_dataset=track) 

    return "CACHE UPLOADED"               

Flask view:

#background cache
@app.route('/cache')
def cache():
    # We can use a with statement to ensure threads are cleaned up promptly
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        #executor.submit(build_cache)

        # start a future for a thread which sends work in through the queue
        future_to_track = {
            executor.submit(build_cache): 'TRACKER DONE'}

        while future_to_track:
            # check for status of the futures which are currently working
            done, not_done = concurrent.futures.wait(
                                                future_to_track, 
                                                timeout=0.25,
                                                return_when=concurrent.futures.FIRST_COMPLETED) 

            # if there is incoming work, start a new future
            while not q.empty():

                # fetch a track from the queue
                track = q.get()

                # Start the load operation and mark the future with its TRACK
                future_to_track[executor.submit(upload_cache, track)] = track
            # process any completed futures
            for future in done:
                track = future_to_track[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print('%r generated an exception: %s' % (track, exc))
                else:
                    if track == 'TRACKER DONE':
                        print(data)
                    else:
                        print('%r track is nont uploaded to database') % (track)

                # remove the now completed future
                del future_to_track[future]


    return 'Cacheing playlist in the background...'

Database works fine with no threading. However, when I run it all, the following exception is being caught:

FIRST CACHE BUILT {'brightness': 0.4293608513678877, 'energy': 0.757, 'tempo': 116.494, 'duration': 201013, 'key': 5, 'loudness': -7.607, 'genre': [u'art rock', u'dance rock', u'folk rock', u'mellow gold', u'new wave', u'new wave pop', u'power pop', u'pub rock', u'rock', u'roots rock'], 'valence': 0.435, 'artist': u'Elvis Costello & The Attractions', 'popularity': 14, 'artist_image': u'https://i.scdn.co/image/c14ffeb7855625383c266c9c04faa75516a25503', 'title': u'Poor Napoleon', 'preview': u'https://p.scdn.co/mp3-preview/c0d57fed887ea2dbd7f69c7209adab71671b9e6e?cid=d3b2f7a12362468daa393cf457185973'} generated an exception: No application found. Either work inside a view function or push an application context. See http://flask-sqlalchemy.pocoo.org/contexts/.

but the process, to my knowledge, is running within @app.route. how is this out of context? how do I fix this?

2
Flask stores its context for each request in thread-local variables, therefore you will get these errors when you're trying to access the Flask global objects (e.g. current_app, request or g) from a different thread. @app.route has nothing to do with context, it just registers the view within your app.Mikhail Burshteyn
Where exactly are you getting the error? Can you post the traceback?Mikhail Burshteyn
in terminal. I get the traceback above for all 3 tracks tested. so it is caught inside the executor.user7830303

2 Answers

1
votes

In case anyone who meet the same problem

from flask import current_app
def context_wrap(fn):
    app_context = current_app.app_context()
    def wrapper(*args, **kwargs):
        with app_context:
            return fn(*args, **kwargs)
    return wrapper

#Then In this case:
future_to_track = {
        executor.submit(context_wrap(build_cache)): 'TRACKER DONE'}
#OR In your case:
context_wrap(any_func_need_context)
0
votes

The following worked:

def build_cache():
    with app.app_context():
        (...)

def upload_cache(track):
    with app.app_context():
        (...)

@app.route('/cache')
def cache():
    with app.app_context():
        with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
            (...)