3
votes

Requirement: I want messages to be persist in queue till onMessage() executed successfully. If any exception occur during execution of onMessage() and if it is not handle then message should be redelivered to listener.

I am having Glassfish v2 as a application server. I am using OpenMQConnectionFactory and JmsTemplate to send message on queue. Please note that I am not using MDB.

<bean id="openMQConnectionFactory"
    class="com.is.br.util.OpenMqConnectionFactoryBean">
    <property name="imqAddressList" value="mq://localhost:7676" />
    <property name="imqDefaultUsername" value="admin" />
    <property name="imqDefaultPassword" value="admin" />
</bean>

I tried AUTO_ACKNOWLEDGE as acknowledge mode but in listener when exception thrown message is not redelivered.

MessageProducer.java

public void sendMessage(final String responseStream) {

    System.out.println("Enter into IsJmsProducer.sendMessage method");

    try {
        MessageCreator creator = new MessageCreator() {
            public Message createMessage(Session session) {
                ObjectMessage message = null;
                try {
                    message = session.createObjectMessage(responseStream);
                    } catch (Exception e) {
                    System.out.println("Unable create a JMSMessage");
                }
                return message;
            }
        };

        System.out.println("Sending message to destination: " + this.destination.toString());
        this.jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);

        this.jmsTemplate.send(this.destination, creator);
        System.out.println("SendMessage to queue successfully.");           
    } catch (Exception ex) {
        System.out.println("SendMessage to queue Fail." + ex);
    }
    System.out.println("Exit from IsJmsProducer.sendMessage method");

}

SampleJMSConsumer.java

public class SampleJMSConsumer implements MessageListener {

    @Override
    public void onMessage(Message message) {
        throw new RuntimeException();
   }
}

Then I tried with this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); and in listener I called message.acknowledge(); and in catch block I called session.recover() still message is not redeliver.

SampleJMSConsumer.java

public class SampleJMSConsumer implements MessageListener  {

    @Override
    public void onMessage(Message message) {

        ObjectMessage objectMessage = (ObjectMessage) message;
        Object object;
        try {
            object = objectMessage.getObject();
            if (object instanceof String) {
                System.out.println("Message received - " + object.toString());
                throw new JMSException("JMS exception");
            }
            message.acknowledge();
        } catch (JMSException e) {
               session.recover();
        }
    }

}

When I run the program in debug mode and I send message on queue in broker admin console I am able to see the number of messages but as soon as onMessage() called number of messages get reduce by one. That means message is consumed and deleted from queue. Is that message consider as "delivered"? Please help me to understand why message is not redeliver when exception occur?

Thanks in advance.

4
Hi Ashwini. I am trying to do something similar. I am using JMSTemplate to send message and listners are started by DefaultMessageListenerContainer. My Issue is I have no session object in onMessage method to recover. How do you pass your session object to your Listner in above code?Rahul Dabas
@Ashwini I know it has been lot of time since you asked this question. But did you find what you were looking for? I'm in the exact situation now. stackoverflow.com/q/60372579/1051215Ramaraj T

4 Answers

1
votes

I think this is by design, delivered when onmessage gets called. If you want do something about the exception you might handle it using try catch.

Assume the message was put on the queue once again, you wouls likely hit the same exception when consumed anyways.

The ack mechanisms should imo be about assuring correct delivery. Maybe what you are after is a reject mechanism where you ask the prpoducerside to send a new message?

1
votes

Client Acknowledge is suited for you. In your onMessage() method, once processing is over, you need to call Acknowledge otherwise if there is any exception, then you don't call Acknowledge().

Session.Recovery() stops and restarts message delivery. The delivery of the message will be from the last unacknowledged message.

0
votes

I would suggest to verify what is the default session mode for OpenMQ. It could happen that, once you opened a connection, you can't change it, so that has to be specified upon connection openin.

0
votes

session created in consumer should set session mode to AUTO_ACK / DUPS_OK_ACK. You haven't shared your code for starting consumer. You are setting session mode in Producer but not Consumer.