2
votes

I have a stateful session bean where I send and receive JMS messages. All the connection setup is handled manually, so the bean is holding instances of javax.jms.connection and javax.jms.session. The bean also implements MessageListener to be able receive messages.

Now, when I send a message, I create a temporary queue with session.createTemporaryQueue(). I set the message.setJMSReplyTo() to this same temporary queue, and at last creates a consumer of this queue and sets the MessageListener to the same stateful session bean that all this is implemented in.

I am abel to receive the message to the onMessage() method. However, I want to close the session and connection as soon as the message has been received, and this is apparently not allowed in the onMessage() method.

So the question is: How can I close the session and connection once the message has been received? I must handle the connection setup manually and can not use a MDB.

Note that: This is executed in the Java EE environment (GlassFish 4.0)

EDIT:

import javax.ejb.LocalBean;
import javax.ejb.Stateful;
import javax.inject.Inject;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.QueueConnectionFactory;

@LocalBean
@Stateful
public class OpenMqClient implements MessageListener{
    private Connection connection;
    private Session session;
    private MessageConsumer responseConsumer;

    public OpenMqClient(){}

    public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
        try{
            String host = System.getProperty("foo", jmsBrokerUri);
            QueueConnectionFactory cf = new QueueConnectionFactory();
            cf.setProperty(ConnectionConfiguration.imqAddressList, host);
            connection = null;
            session = null;

            //Setup connection
            connection = cf.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

            //Setup queue and producer
            Queue queue = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(queue);


            //Reply destination
            Queue responseQueue = session.createTemporaryQueue();
            responseConsumer = session.createConsumer(responseQueue);
            responseConsumer.setMessageListener(this);

            //Create message
            TextMessage textMessage = session.createTextMessage();
            textMessage.setJMSReplyTo(responseQueue);
            textMessage.setJMSCorrelationID("test0101");
            textMessage.setText(messageContent);

            producer.send(textMessage);
            System.out.println("Message sent");
        } catch (JMSException e) {
            e.printStackTrace();
            System.out.println("JMSException in Sender");
        }
    }

    @Override
    public void onMessage(Message arg0) {
        //On this event I want to close the session and connection, but it's not permitted
    }

}
1
How is the session created? what is the transactional context?ring bearer
Opening/closing sessions and connections is an expensive operation, and I would avoid that if you need any kind of performance. Having said that, your best bet is probably to create a new Runnable that takes in your session and connection and spawn a new thread that will close your session and connection in the background - you can even add a time delay so that the connection isn't closed until your onMesage is likely to have finishedMatt
How about creating session and connection once. (for example in @PostConstruct) and then destroying it once ( using @PreDestroy) of the session bean?ring bearer
@ringbearer : It is a solution, but it's not very effective since a response is almost always received very quick. However, some responses might take longer time, hence I can not use timeouts. If many users are sending messages, there will be a lot of open sessions that are not used (until the session beans is destroyed).Humz
Then you will have to stick with what @Matt has suggested. Try running a closer thread from onMessage()ring bearer

1 Answers

0
votes

Personally, this is how I would do it (note I haven't tested or added much error handling to this code).

  1. Make the connection static - you can (probably should) reuse the same connection for all your beans unless you have a specific reason not to
  2. Close the session in a new thread

    public class OpenMqClient implements MessageListener {
    
        private static Connection connection;
        private static final String mutex = "mutex"; 
        private Session session;
        private MessageConsumer responseConsumer;
    
        public OpenMqClient() {
            if(connection == null) {
                synchronized(mutex) {
                    if(connection == null) {
                        String host = System.getProperty("foo", jmsBrokerUri);
                        QueueConnectionFactory cf = new QueueConnectionFactory();
                        cf.setProperty(ConnectionConfiguration.imqAddressList, host);
    
                        // Setup connection
                        connection = cf.createConnection();
                        connection.start();
                    }
                }
            }
        }
    
        public void sendMessage(String messageContent, String jmsBrokerUri, String queueName) {
            try {
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
                // Setup queue and producer
                Queue queue = session.createQueue(queueName);
                MessageProducer producer = session.createProducer(queue);
    
                // Reply destination
                Queue responseQueue = session.createTemporaryQueue();
                responseConsumer = session.createConsumer(responseQueue);
                responseConsumer.setMessageListener(this);
    
                // Create message
                TextMessage textMessage = session.createTextMessage();
                textMessage.setJMSReplyTo(responseQueue);
                textMessage.setJMSCorrelationID("test0101");
                textMessage.setText(messageContent);
    
                producer.send(textMessage);
                System.out.println("Message sent");
            } catch (JMSException e) {
                e.printStackTrace();
                System.out.println("JMSException in Sender");
            }
        }
    
        @Override
        public void onMessage(Message arg0) {
            // do stuff
            new Thread(
                new Runnable() {
                    @Override
                    public void run() {
                        if(session != null)
                            try {
                                session.close();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }               
                    }
                }
            ).start();
        }
    }