2
votes

When i was looking examples of jersey sse i have found one example sse-item-store-webapp in jersey example folder. It is very simple app that has one input and one button. You type some text, click the button and other people get changes.

@Path("items")
public class ItemStoreResource {

    private static final ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock();
    private static final LinkedList<String> itemStore = new LinkedList<String>();
    private static final SseBroadcaster broadcaster = new SseBroadcaster();

    private static volatile long reconnectDelay = 0;

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String listItems() {
        try {
            storeLock.readLock().lock();
            return itemStore.toString();
        } finally {
            storeLock.readLock().unlock();
        }
    }

    @GET
    @Path("events")
    @Produces(SseFeature.SERVER_SENT_EVENTS)
    public EventOutput itemEvents(@HeaderParam(SseFeature.LAST_EVENT_ID_HEADER) @DefaultValue("-1") int lastEventId) {
        final EventOutput eventOutput = new EventOutput();


        if (lastEventId >= 0) {
            LOGGER.info("Received last event id :" + lastEventId);

            // decide the reconnect handling strategy based on current reconnect delay value.
            final long delay = reconnectDelay;
            if (delay > 0) {
                LOGGER.info("Non-zero reconnect delay [" + delay + "] - responding with HTTP 503.");
                throw new ServiceUnavailableException(delay);
            } else {
                LOGGER.info("Zero reconnect delay - reconnecting.");
                replayMissedEvents(lastEventId, eventOutput);
            }
        }


        if (!broadcaster.add(eventOutput)) {
            LOGGER.severe("!!! Unable to add new event output to the broadcaster !!!");
            // let's try to force a 5s delayed client reconnect attempt
            throw new ServiceUnavailableException(5L);
        }

        return eventOutput;
    }

    private void replayMissedEvents(final int lastEventId, final EventOutput eventOutput) {
        try {
            storeLock.readLock().lock();
            final int firstUnreceived = lastEventId + 1;
            final int missingCount = itemStore.size() - firstUnreceived;
            if (missingCount > 0) {
                LOGGER.info("Replaying events - starting with id " + firstUnreceived);
                final ListIterator<String> it = itemStore.subList(firstUnreceived, itemStore.size()).listIterator();
                while (it.hasNext()) {
                    eventOutput.write(createItemEvent(it.nextIndex() + firstUnreceived, it.next()));
                }
            } else {
                LOGGER.info("No events to replay.");
            }
        } catch (IOException ex) {
            throw new InternalServerErrorException("Error replaying missed events", ex);
        } finally {
            storeLock.readLock().unlock();
        }
    }

    @POST
    public void addItem(@FormParam("name") String name) {
        // Ignore if the request was sent without name parameter.
        if (name == null) {
            return;
        }

        final int eventId;
        try {
            storeLock.writeLock().lock();
            eventId = itemStore.size();
            itemStore.add(name);

            // Broadcasting an un-named event with the name of the newly added item in data
            broadcaster.broadcast(createItemEvent(eventId, name));
            // Broadcasting a named "size" event with the current size of the items collection in data
            broadcaster.broadcast(new OutboundEvent.Builder().name("size").data(Integer.class, eventId + 1).build());

        } finally {
            storeLock.writeLock().unlock();
        }
    }

    private OutboundEvent createItemEvent(final int eventId, final String name) {
        Logger.getLogger(ItemStoreResource.class.getName()).info("Creating event id [" + eventId + "] name [" + name + "]");
        return new OutboundEvent.Builder().id("" + eventId).data(String.class, name).build();
    }
}

For example, if i have a chat rooms i don't understand how to implement that using SSE becouse every client connects to /items/events and if someone post new message to some chat broadcaster will broadcast this message to all signed events however i want broadcast events only for some chat.

Who works with Jersey SSE could you advise how to implement that ?

1

1 Answers

3
votes

try to use smth like a map with chat room ids to SseBroadcast object, then you could subscribe all users from certain room to broadcaster. You may use that as for tet-a-tet conversations or team conversations.

Sample below:

private static final Map<Long, SseBroadcaster> ROOM_SSE_BROADCASTER = new ConcurrentHashMap<>();

@GET
@Path("/updatestate/{roomId}/{userId}")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput updateState(@PathParam("roomId") Long roomId, @PathParam("userId") Long userId) {
    EventOutput eo = new EventOutput();
    ROOM_SSE_BROADCASTER.get(roomId).add(eo);

    return eo;
}

public static void updateRoom(Long roomId) {
    ROOM_SSE_BROADCASTER.get(roomId).broadcast(buildEvent());
}

public static void registerRoom(Long roomId) {
    ROOM_SSE_BROADCASTER.put(roomId, new SseBroadcaster());
}

private static OutboundEvent buildEvent() {
    OutboundEvent.Builder builder = new OutboundEvent.Builder();
    OutboundEvent event = builder.data(String.class, "update").build();

    return event;
}