0
votes

I have a Flask web-server which generates server-sent-events (sse) which should be received by all connected web-clients.

In "Version 1" below this works. All web-clients receive the events, and update accordingly.

In "Version 2" below, which is a refactoring of Version 1, this no longer works as expected:

Instead I get:

  • mostly only one of the web-clients gets the event, or
  • rarely multiple web-clients get the event, or
  • rarely none of the web-clients get the event

As far as I can make out, the server is always generating the events, and normally at least one client is receiving.

My initial test hosted the web-server on a Raspberry Pi 3, with the web-clients on the Pi, on Windows and OSX using a variety of browsers.

To eliminate any possible network issues I repeated the same test with the web-server and 3 instances of Chrome all hosted on the same OSX laptop. This gave the same results: Version 1 "OK", Version 2 "NOT OK".

The client that successfully receives seemingly varies randomly from event to event: so far I can't discern a pattern.

Both Version 1 and Version 2 have a structure change_objects containing "things that should be tracked for changes"

  • In Version 1 change_objects is a dict of dicts.

  • In Version 2 I refactored change_objects to be a list of instances of the class Reporter, or sub classes of Reporter.

The changes to the "things" are triggered based on web-services received elsewhere in the code.

Version 1 (OK: sse events received by all web-clients)

def check_walk(walk_new, walk_old):
    if walk_new != walk_old:
        print("walk change", walk_old, walk_new)
        return True, walk_new
    else:
        return False, walk_old

def walk_event(walk):
    silliness = walk['silliness']
    data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
    return "data: {}\n\n".format(data)

change_objects = {
    "walk1": {
        "object": walks[0],
        "checker": check_walk,
        "event": walk_event,
    },
    ... more things to be tracked...
}

def event_stream(change_objects):
    copies = {}
    for key, value in change_objects.items():
        copies[key] = {"obj_old": deepcopy(value["object"])}  # ensure a true copy, not a reference!

    while True:
        gevent.sleep(0.5)
        for key, value in change_objects.items():
            obj_new = deepcopy(value["object"]) # use same version in check and yield functions
            obj_changed, copies[key]["obj_old"] = value["checker"](obj_new, copies[key]["obj_old"])
            if (obj_changed):
                yield value["event"](obj_new)

@app.route('/server_events')
def sse_request():
    return Response(
            event_stream(change_objects),
            mimetype='text/event-stream')

Version 2 (NOT OK: sse events NOT always received by all web-clients)

class Reporter:

    def __init__(self, reportee, name):
        self._setup(reportee, name)

    def _setup(self, reportee, name):
        self.old = self.truecopy(reportee)
        self.new = reportee
        self.name = "{}_change".format(name)

    def truecopy(self, orig):
        return deepcopy(orig)

    def changed(self):
        if self.new != self.old:
            self.old = self.truecopy(self.new)
            return True
        else:
            return False

    def sse_event(self):
        data = self.new.copy()
        data['type'] = self.name
        data = json.dumps(data)
        return "data: {}\n\n".format(data)

class WalkReporter(Reporter):

    # as we are only interested in changes to attribute "silliness" (not other attributes) --> override superclass sse_event
    def sse_event(self): 
        silliness = self.new['silliness']
        data = '{{"type": "walk_change", "silliness": {}}}'.format(silliness)
        return "data: {}\n\n".format(data)

change_objects = [
    WalkReporter(name="walk1", reportee=walks[0]),
    ... more objects to be tracked...
] 

def event_stream(change_objects):
    while True:
        gevent.sleep(0.5)
        for obj in change_objects:
            if obj.changed():
                yield obj.sse_event()

@app.route('/server_events')
def sse_request():
    return Response(
            event_stream(change_objects),
            mimetype='text/event-stream')

Full disclosure: This question is a follow on to the question: Refactor a (multi)generator python function which focussed on refactoring the event_stream() function when tracking changes to multiple "things". However the problem here is clearly outside the scope of the original question, hence a new one.

1
After lots of debugging it has just clicked: this is a concurrency / timing problem. I seem to have a solution, but need to do a little more testing before posting an answer / explanationFlyingSheep

1 Answers

1
votes

The refactored "Version 2" code in the question suffers from a concurrency / timing problem.

sse_request() is called for each of the web-clients (in the test case 3 instances). We thus have 3 instances looping in event_stream().

These calls happen "more or less" in parallel: which actually means in random sequence.

However the list change_objects is shared, so the first web-client that spots a change will update the "old" copy in the shared WalkReporter instance to the latest state, and may do so before the other clients spot the change. i.e. the first successful web-client effectively hides the change from the other web-clients.

This is easily fixed, by giving each web-client its own copy of change_objects.

i.e. change_objects is moved into sse_request() as shown below.

@app.route('/server_events')
def sse_request():
    change_objects = [
        WalkReporter(name="walk1", reportee=walks[0]),
        ... more objects to be tracked...
    ]
    return Response(
            event_stream(change_objects),
            mimetype='text/event-stream')

With this minor change, each instance of sse_request() can spot the changes, and thus all the web-clients receive the sse-events as expected.