1
votes

I am using jersey to implement a SSE scenario.

The server keeps connections alive. And push data to clients periodically.

In my scenario, there is a connection limit, only a certain number of clients can subscribe to the server at the same time.

So when a new client is trying to subscribe, I do a check(EventOutput.isClosed) to see if any old connections are not active anymore, so they can make room for new connections.

But the result of EventOutput.isClosed is always false, unless the client explicitly calls close of EventSource. This means that if a client drops accidentally(power outage or internet cutoff), it's still hogging the connection, and new clients can not subscribe.

Is there a work around for this?

2
I'm seeing the same issue. The onClose() method on the SseBroadcaster never gets called. This causes connections to wait in CLOSE_WAIT state and slowly kill the server. Please let me know if you found a workaround for this issue.JVXR
@JVXR no, we had that problem all alongCui Pengfei 崔鹏飞
Have opened a bug java.net/jira/browse/JERSEY-2833JVXR

2 Answers

6
votes

@CuiPengFei,

So in my travels trying to find an answer to this myself I stumbled upon a repository that explains how to handle gracefully cleaning up the connections from disconnected clients.

The encapsulate all of the SSE EventOutput logic into a Service/Manager. In this they spin up a thread that checks to see if the EventOutput has been closed by the client. If so they formally close the connection (EventOutput#close()). If not they try to write to the stream. If it throws an Exception then the client has disconnected without closing and it handles closing it. If the write is successful then the EventOutput is returned to the pool as it is still an active connection.

The repo (and the actual class) are available here. Ive also included the class without imports below in case the repo is ever removed.

Note that they bind this to a Singleton. The store should be globally unique.

public class SseWriteManager {

private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>();

private final ScheduledExecutorService messageExecutorService;

private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class);

public SseWriteManager() {
    messageExecutorService = Executors.newScheduledThreadPool(1);
    messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS);
}

public void addSseConnection(String id, EventOutput eventOutput) {
    logger.info("adding connection for id={}.", id);
    connectionMap.put(id, eventOutput);
}

private class messageProcessor implements Runnable {
    @Override
    public void run() {
        try {
            Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator();
            while (iterator.hasNext()) {
                boolean remove = false;
                Map.Entry<String, EventOutput> entry = iterator.next();
                EventOutput eventOutput = entry.getValue();
                if (eventOutput != null) {
                    if (eventOutput.isClosed()) {
                        remove = true;
                    } else {
                        try {
                            logger.info("writing to id={}.", entry.getKey());
                            eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build());
                        } catch (Exception ex) {
                            logger.info(String.format("write failed to id=%s.", entry.getKey()), ex);
                            remove = true;
                        }
                    }
                }
                if (remove) {
                    // we are removing the eventOutput. close it is if it not already closed.
                    if (!eventOutput.isClosed()) {
                        try {
                            eventOutput.close();
                        } catch (Exception ex) {
                            // do nothing.
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (Exception ex) {
            logger.error("messageProcessor.run threw exception.", ex);
        }
    }
}

public void shutdown() {
    if (messageExecutorService != null && !messageExecutorService.isShutdown()) {
        logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown.");
        messageExecutorService.shutdown();
    } else {
        logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown().");
    }

}} 
2
votes

Wanted to provide an update on this:

What was happening is that the eventSource on the client side (js) never got into readyState '1' unless we did a broadcast as soon as a new subscription was added. Even in this state the client could receive data pushed from the server. Adding call to do a broadcast of a simple "OK" message helped kicking the eventSource into readyState 1.

On closing the connection from the client side; to be pro-active in cleaning up resources, just closing the eventSource on the client side doesn't help. We must make another ajax call to the server to force the server to do a broadcast. When the broadcast is forced, jersey will clean up the connections that are no longer alive and will in-turn release resources (Connections in CLOSE_WAIT). If not a connection will linger in CLOSE_WAIT till the next broadcast happens.