I am trying to implement Server-Sent Events in my Flask application by following this simple recipe: http://flask.pocoo.org/snippets/116/
For serving the app, I use gunicorn with gevent workers.
A minimal version of my code looks like this:
import multiprocessing
from gevent.queue import Queue
from gunicorn.app.base import BaseApplication
from flask import Flask, Response
app = Flask('minimal')
# NOTE: This is the global list of subscribers
subscriptions = []
class ServerSentEvent(object):
def __init__(self, data):
self.data = data
self.event = None
self.id = None
self.desc_map = {
self.data: "data",
self.event: "event",
self.id: "id"
}
def encode(self):
if not self.data:
return ""
lines = ["%s: %s" % (v, k)
for k, v in self.desc_map.iteritems() if k]
return "%s\n\n" % "\n".join(lines)
@app.route('/api/events')
def subscribe_events():
def gen():
q = Queue()
print "New subscription!"
subscriptions.append(q)
print len(subscriptions)
print id(subscriptions)
try:
while True:
print "Waiting for data"
result = q.get()
print "Got data: " + result
ev = ServerSentEvent(unicode(result))
yield ev.encode()
except GeneratorExit:
print "Removing subscription"
subscriptions.remove(q)
return Response(gen(), mimetype="text/event-stream")
@app.route('/api/test')
def push_event():
print len(subscriptions)
print id(subscriptions)
for sub in subscriptions:
sub.put("test")
return "OK"
class GunicornApplication(BaseApplication):
def __init__(self, wsgi_app, port=5000):
self.options = {
'bind': "0.0.0.0:{port}".format(port=port),
'workers': multiprocessing.cpu_count() + 1,
'worker_class': 'gevent',
'preload_app': True,
}
self.application = wsgi_app
super(GunicornApplication, self).__init__()
def load_config(self):
config = dict([(key, value) for key, value in self.options.iteritems()
if key in self.cfg.settings and value is not None])
for key, value in config.iteritems():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
if __name__ == '__main__':
gapp = GunicornApplication(app)
gapp.run()
The problem is that the subscriber's list seems to be different for every worker. This means that if worker #1 handles the /api/events endpoint and adds a new subscriber to the list, the client will only receive events that are added when worker #1 also handles the /api/test endpoint.
Curiously enough, the actual list object seems to be the same for each worker, since id(subscriptions) returns the same value in every worker.
Is there a way around this? I know that I could just use Redis, but the application is supposed to be as self-contained as possible, so I'm trying to avoid any external services.
Update: The cause of the problem seems to be in my embedding of the gunicorn.app.base.BaseApplication (which is a new feature in v0.19). When running the application from the command-line with gunicorn -k gevent minimal:app, everything works as expected
Update 2: The previous suspicion turned out to be wrong, the only reason it worked was because gunicorn's default number of worker processes is 1, when adjusting the number to fit the code via the -w parameter, it exhibits the same behavior.