7
votes

I believe this question is not a duplicate of Server sent event with Jersey: EventOutput is not closed after client drops, but probably related to Jersey Server-Sent Events - write to broken connection does not throw exception.

In chapter 15.4.2 of the Jersey documentation, the SseBroadcaster is described:

However, the SseBroadcaster internally identifies and handles also client disconnects. When a client closes the connection the broadcaster detects this and removes the stale connection from the internal collection of the registered EventOutputs as well as it frees all the server-side resources associated with the stale connection.

I cannot confirm this. In the following testcase, I see the subclassed SseBroadcaster's onClose() method never being called: not when the EventInput is closed, and not when another message is broadcasted.

public class NotificationsResourceTest extends JerseyTest {
    final static Logger log = LoggerFactory.getLogger(NotificationsResourceTest.class);

    final static CountingSseBroadcaster broadcaster = new CountingSseBroadcaster();

    public static class CountingSseBroadcaster extends SseBroadcaster { 
        final AtomicInteger connectionCounter = new AtomicInteger(0);

        public EventOutput createAndAttachEventOutput() {
            EventOutput output = new EventOutput();
            if (add(output)) {
                int cons = connectionCounter.incrementAndGet();
                log.debug("Active connection count: "+ cons);
            }
            return output;
        }

        @Override
        public void onClose(final ChunkedOutput<OutboundEvent> output) {
            int cons = connectionCounter.decrementAndGet();
            log.debug("A connection has been closed. Active connection count: "+ cons);
        }

        @Override
        public void onException(final ChunkedOutput<OutboundEvent> chunkedOutput, final Exception exception) {
            log.trace("An exception has been detected", exception);
        }

        public int getConnectionCount() {
            return connectionCounter.get();
        }
    }

    @Path("notifications")
    public static class NotificationsResource {

        @GET
        @Produces(SseFeature.SERVER_SENT_EVENTS)
        public EventOutput subscribe() {
            log.debug("New stream subscription");

            EventOutput eventOutput = broadcaster.createAndAttachEventOutput();
            return eventOutput;
        }
    }   

    @Override
    protected Application configure() {
        ResourceConfig config = new ResourceConfig(NotificationsResource.class);
        config.register(SseFeature.class);

        return config;
    }


    @Test
    public void test() throws Exception {
        // check that there are no connections
        assertEquals(0, broadcaster.getConnectionCount());

        // connect subscriber
        log.info("Connecting subscriber");
        EventInput eventInput = target("notifications").request().get(EventInput.class);
        assertFalse(eventInput.isClosed());

        // now there are connections
        assertEquals(1, broadcaster.getConnectionCount());

        // push data
        log.info("Broadcasting data");
        String payload = UUID.randomUUID().toString();
        OutboundEvent chunk = new OutboundEvent.Builder()
                .mediaType(MediaType.TEXT_PLAIN_TYPE)
                .name("message")
                .data(payload)
                .build();
        broadcaster.broadcast(chunk);

        // read data
        log.info("Reading data");
        InboundEvent inboundEvent = eventInput.read();
        assertNotNull(inboundEvent);
        assertEquals(payload, inboundEvent.readData());

        // close subscription 
        log.info("Closing subscription");
        eventInput.close();
        assertTrue(eventInput.isClosed());

        // at this point, the subscriber has disconnected itself, 
        // but jersey doesnt realise that
        assertEquals(1, broadcaster.getConnectionCount());

        // wait, give TCP a chance to close the connection
        log.debug("Sleeping for some time");
        Thread.sleep(10000);

        // push data again, this should really flush out the not-connected client
        log.info("Broadcasting data again");
        broadcaster.broadcast(chunk);
        Thread.sleep(100);

        // there is no subscriber anymore
        assertEquals(0, broadcaster.getConnectionCount());  // FAILS!
    }
}

Maybe JerseyTest is not a good way to test this. In a less ... clinical setup, where a JavaScript EventSource is used, I see onClose() being called, but only after a message is broadcasted on the previously closed connection.

What am I doing wrong?

Why doesn't SseBroadcaster detect the closing of the connection by the client?

Follow-up

I've found JERSEY-2833 which was rejected with Works as designed:

According to the Jersey Documentation in SSE chapter (https://jersey.java.net/documentation/latest/sse.html) in 15.4.1 it's mentioned that Jersey does not explicitly close the connection, it's the responsibility of the resource method or the client.

What does that mean exactly? Should the resource enforce a timeout and kill all active and closed-by-client connections?

3

3 Answers

2
votes

In the documentation of the constructor org.glassfish.jersey.media.sse.SseBroadcaster.SseBroadcaster(), it says:

Creates a new instance. If this constructor is called by a subclass, it assumes the the reason for the subclass to exist is to implement onClose(org.glassfish.jersey.server.ChunkedOutput) and onException(org.glassfish.jersey.server.ChunkedOutput, Exception)methods, so it adds the newly created instance as the listener. To avoid this, subclasses may call SseBroadcaster(Class) passing their class as an argument.

So you should not leave default constructor and try implementing your constructor invoking super with your class:

public CountingSseBroadcaster(){
    super(CountingSseBroadcaster.class);
}
0
votes

I believe it might be better to set a timeout on your resource and kill only that connection, for example:

@Path("notifications")
public static class NotificationsResource {

    @GET
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public EventOutput subscribe() {
        log.debug("New stream subscription");

        EventOutput eventOutput = broadcaster.createAndAttachEventOutput();
        new Timer().schedule( new TimerTask()
        {
            @Override public void run()
            {
               eventOutput.close()
            }
        }, 10000); // 10 second timeout
        return eventOutput;
    }
}   
0
votes

Im wondering if by subclassing you may have changed the behaviour.

    @Override
    public void onClose(final ChunkedOutput<OutboundEvent> output) {
        int cons = connectionCounter.decrementAndGet();
        log.debug("A connection has been closed. Active connection count: "+ cons);
    }

In this you don't close the ChunkedOutput so it won't release the connection. Could this be the problem?