When i was looking examples of jersey sse i have found one example sse-item-store-webapp in jersey example folder. It is very simple app that has one input and one button. You type some text, click the button and other people get changes.
@Path("items")
public class ItemStoreResource {
private static final ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock();
private static final LinkedList<String> itemStore = new LinkedList<String>();
private static final SseBroadcaster broadcaster = new SseBroadcaster();
private static volatile long reconnectDelay = 0;
@GET
@Produces(MediaType.TEXT_PLAIN)
public String listItems() {
try {
storeLock.readLock().lock();
return itemStore.toString();
} finally {
storeLock.readLock().unlock();
}
}
@GET
@Path("events")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput itemEvents(@HeaderParam(SseFeature.LAST_EVENT_ID_HEADER) @DefaultValue("-1") int lastEventId) {
final EventOutput eventOutput = new EventOutput();
if (lastEventId >= 0) {
LOGGER.info("Received last event id :" + lastEventId);
// decide the reconnect handling strategy based on current reconnect delay value.
final long delay = reconnectDelay;
if (delay > 0) {
LOGGER.info("Non-zero reconnect delay [" + delay + "] - responding with HTTP 503.");
throw new ServiceUnavailableException(delay);
} else {
LOGGER.info("Zero reconnect delay - reconnecting.");
replayMissedEvents(lastEventId, eventOutput);
}
}
if (!broadcaster.add(eventOutput)) {
LOGGER.severe("!!! Unable to add new event output to the broadcaster !!!");
// let's try to force a 5s delayed client reconnect attempt
throw new ServiceUnavailableException(5L);
}
return eventOutput;
}
private void replayMissedEvents(final int lastEventId, final EventOutput eventOutput) {
try {
storeLock.readLock().lock();
final int firstUnreceived = lastEventId + 1;
final int missingCount = itemStore.size() - firstUnreceived;
if (missingCount > 0) {
LOGGER.info("Replaying events - starting with id " + firstUnreceived);
final ListIterator<String> it = itemStore.subList(firstUnreceived, itemStore.size()).listIterator();
while (it.hasNext()) {
eventOutput.write(createItemEvent(it.nextIndex() + firstUnreceived, it.next()));
}
} else {
LOGGER.info("No events to replay.");
}
} catch (IOException ex) {
throw new InternalServerErrorException("Error replaying missed events", ex);
} finally {
storeLock.readLock().unlock();
}
}
@POST
public void addItem(@FormParam("name") String name) {
// Ignore if the request was sent without name parameter.
if (name == null) {
return;
}
final int eventId;
try {
storeLock.writeLock().lock();
eventId = itemStore.size();
itemStore.add(name);
// Broadcasting an un-named event with the name of the newly added item in data
broadcaster.broadcast(createItemEvent(eventId, name));
// Broadcasting a named "size" event with the current size of the items collection in data
broadcaster.broadcast(new OutboundEvent.Builder().name("size").data(Integer.class, eventId + 1).build());
} finally {
storeLock.writeLock().unlock();
}
}
private OutboundEvent createItemEvent(final int eventId, final String name) {
Logger.getLogger(ItemStoreResource.class.getName()).info("Creating event id [" + eventId + "] name [" + name + "]");
return new OutboundEvent.Builder().id("" + eventId).data(String.class, name).build();
}
}
For example, if i have a chat rooms i don't understand how to implement that using SSE becouse every client connects to /items/events
and if someone post new message to some chat broadcaster
will broadcast this message to all signed events however i want broadcast events only for some chat.
Who works with Jersey SSE could you advise how to implement that ?