This application receives & forwards messages from database events to client applications. Messages are immediately delivered when the client browser has a web socket session.
However, when no web socket session exists and a message is sent by the JMSProducer into the Destination "jms/notificationQueue" in QueueSenderSessionBean, the message is immediately consumed in NotificationEndpoint. This is not my intent.
My intent is for the queue to retain the message until the user connects to NotificationEndpoint. If the user is not connected to the NotificationEndpoint, I think there should be no instance of NotificationEndpoint created to receive the message.
How do I delay the JMSConsumer consuming the message from the queue?
Overview - TomEE Plus 8.0.0-M1 project
- Application receives notification in a NotificationServlet HttpServletRequest
- String message is put into JMS Queue by QueueSenderSessionBean injected into NotificationServlet
- NotificationMessageDrivenBean implements MessageListener to listen to the JMS Queue
- An Event annotated with @NotificationServletJMSMessage is fired from NotificationMessageDrivenBean for an Observer in NotificationEndpoint method onJMSMessage.
- NotificationEndpoint uses PushContext which gathers all websocket sessions to deliver the message to the user
- In PushContext.send, if any websocket sessions with a user uuid property matching the message user uuid property, the message is delivered to each websocket session.
My understanding of @ServerEndpoint is that "each new WS session gets its own instance." Notify only specific user(s) through WebSockets, when something is modified in the database
Sources: the above link from https://stackoverflow.com/users/157882/balusc and https://blogs.oracle.com/theaquarium/integrating-websockets-and-jms-with-cdi-events-in-java-ee-7-v2
WEB-INF/resources.xml
<?xml version="1.0" encoding="UTF-8"?>
<resources>
<Resource id="jmsConnectionFactory" type="javax.jms.ConnectionFactory">
connectionMaxIdleTime = 15 Minutes
connectionMaxWaitTime = 5 seconds
poolMaxSize = 10
poolMinSize = 0
resourceAdapter = Default JMS Resource Adapter
transactionSupport = xa
</Resource>
</resources>
NotificationServlet.java
import java.io.IOException;
import java.util.UUID;
import javax.annotation.Resource;
import javax.faces.context.FacesContext;
import javax.inject.Inject;
import javax.jms.Queue;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/notifications")
public class NotificationServlet extends HttpServlet
{
@Resource(name = "jms/notificationQueue")
private Queue _notificationQueue;
@Inject
private QueueSenderSessionBean _queueSessionSenderBean;
@Override
protected void doGet(HttpServletRequest request,
HttpServletResponse response)
throws ServletException,
IOException
{
try
{
String notificationJson =
extractNotificationJson(request);
if (notificationJson != null)
{
_queueSessionSenderBean.sendMessage(
"notification="
+ notificationJson);
}
}
catch (Exception e)
{
e.printStackTrace();
// handle exception
}
}
public String extractNotificationJson(HttpServletRequest request)
throws IOException
{
if(request.getParameter("notification") != null)
{
String[] notificationString =
request.getParameterValues("notification");
return notificationString[0];
}
return null;
}
}
QueueSenderSessionBean.java
import javax.annotation.Resource;
import javax.ejb.LocalBean;
import javax.ejb.Stateless;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.DeliveryMode;
import javax.jms.JMSConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
import org.json.JSONObject;
@Named
@LocalBean
@Stateless
public class QueueSenderSessionBean
{
@Resource(mappedName = "jms/notificationQueue")
private Queue _notificationQueue;
@Inject
@JMSConnectionFactory("jmsConnectionFactory")
private JMSContext _jmsContext;
// Static Methods
// Member Methods
public void sendMessage(String message)
{
try
{
JMSProducer messageProducer =
_jmsContext.createProducer();
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
String userProperty = "someValue";
TextMessage textMessage = _jmsContext.createTextMessage(message);
textMessage.setStringProperty("userProperty", userProperty);
messageProducer.send(_notificationQueue, textMessage);
}
catch (JMSException e)
{
e.printStackTrace();
// handle jms exception
}
}
}
Qualifier NotificationServletJMSMessage.java
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import javax.inject.Qualifier;
@Qualifier
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
public @interface NotificationServletJMSMessage
{
}
NotificationMessageDrivenBean.java
import javax.ejb.MessageDriven;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.Message;
import javax.jms.MessageListener;
@Named
@MessageDriven(mappedName = "jms/notificationQueue")
public class NotificationMessageDrivenBean implements MessageListener
{
@Inject
@NotificationServletJMSMessage
Event<Message> jmsEvent;
@Override
public void onMessage(Message message)
{
jmsEvent.fire(message);
}
}
PushContext.java
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.TextMessage;
import javax.websocket.Session;
@ApplicationScoped
public class PushContext
{
@Inject
private JMSContext _jmsContext;
@Resource(mappedName = "jms/notificationQueue")
private Queue _notificationQueue;
private Map<String, Set<Session>> _sessions;
@PostConstruct
public void init()
{
_sessions = new ConcurrentHashMap<>();
}
public void add(Session session, String userUuid)
{
_sessions.computeIfAbsent(userUuid,
value -> ConcurrentHashMap.newKeySet()).add(session);
}
void remove(Session session)
{
_sessions.values().forEach(value -> value.removeIf(e -> e.equals(session)));
}
public void send(Set<String> userUuids, Message message) throws JMSException
{
String userUuid = message.getStringProperty("userUuid");
userUuids.add(userUuid);
Set<Session> userSessions;
synchronized(_sessions)
{
userSessions = _sessions.entrySet().stream()
.filter(e -> userUuids.contains(e.getKey()))
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toSet());
}
for (Session userSession : userSessions)
{
if (userSession.isOpen())
{
userSession.getAsyncRemote().sendText(((TextMessage) message).getText());
}
}
}
public void removeSession(Session session)
{
String userUuid = (String)session.getUserProperties().get("userUuid");
_sessions.remove(userUuid, session);
}
}
NotificationEndpoint.java
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Named;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.websocket.CloseReason;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
@Named
@ServerEndpoint(value="/notificationEndpoint/{tokenId}")
public class NotificationEndpoint
{
private static final Set<Session> SESSIONS =
Collections.synchronizedSet(new HashSet<Session>());
private QueueSenderSessionBean _senderBean;
@Inject
private PushContext _pushContext;
@Inject
public NotificationEndpoint(QueueSenderSessionBean senderBean)
{
_senderBean = senderBean;
}
@OnOpen
public void onOpen(Session session,
EndpointConfig configurator,
@PathParam(value = "tokenId") String userUuidString)
{
session.getUserProperties().put("userUuid", userUuidString);
_pushContext.add(session, userUuidString);
}
@OnMessage
public void onMessage(String message, Session session)
throws IOException
{
System.out.println("Message received: " + message);
_senderBean.sendMessage(message);
}
@OnClose
public void onClose(CloseReason reason, Session session)
{
System.out.println(
"Closing 'notificatioEndpoint due to "
+ reason.getReasonPhrase());
try
{
session.close();
}
catch (IOException e)
{
e.printStackTrace();
}
_pushContext.removeSession(session);
}
@OnError
public void error(Session session, Throwable t)
{
t.printStackTrace();
}
public static void sendToAllClients(String message)
{
synchronized (SESSIONS)
{
for (Session session : SESSIONS)
{
if (session.isOpen())
{
session.getAsyncRemote().sendText(message);
}
}
}
}
public void onJMSMessage(@Observes @NotificationServletJMSMessage Message message)
{
Set<String> userUuids = new HashSet<String>();
try
{
_pushContext.send(userUuids, message);
}
catch (JMSException ex)
{
ex.printStackTrace();
Logger.getLogger(NotificationEndpoint.class.getName()).
log(Level.SEVERE, null, ex);
}
}
}
Thank you, Ted S