1
votes

I am using JMS to send receive message from IBM MQ message broker. I am currently working on listener service throwing unhandled excepion and message sent back to queue without acknowledgement. I want the service to retry a configurable number of time and throw meaning full exception message that listener service is unavailable.

My listener and container factory looks like below.

@JmsListener(destination = "testqueue", containerFactory = "queuejmsfactory")
public void consumer(String message) throws JMSException 
{ handle(message); }

@Bean(name = "queuejmsfactory") public JmsListenerContainerFactory getQueueTopicFactory(ConnectionFactory con , 
DefaultJmsListenerContainerFactoryConfigurer config) 
{ DefaultJmsListenerContainerFactory d = new DefaultJmsListenerContainerFactory(); 
d.setSessionTransacted(true); 
d.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); 
config.configure(d,con); 
return d; }

I short, I have an existing code using the SessionawareMessageListener onMessage which i am trying to replicate to @JmsListener. How do i handle the session commit and rollback automatically and how do i get the session in JmsListener if have to handle manually similar to onMessage.

@Override
public void onMessage(Mesage mes, Session ses) throws JMSException 
{ try 
{ TestMessage txtMessage = (TextMessage)message; 
handle(txtMessage); ses.commit(); 
} catch (Exception exp) 
{ if (shouldRollback(message))
{ ses.rollback();} 
else{logger,warn("moved to dlq"); 
ses.commit(); 
} 
} }

private boolean shouldRollback(Message mes) throws JMSException 
{ int rollbackcount = mes.getIntProperty("JMSXDeliveryCount"); 
return (rollbackcount <= maxRollBackCountFromApplication.properties)
}

Updated code:

@JmsListener(destination = "testqueue", containerFactory = "queuejmsfactory")
    public void consumer(Message message) throws JMSException 
    { 
        try {TestMessage txtMessage = (TextMessage)message;
            handle(txtMessage);}
        catch(Excepton ex) {
            if shouldRollback(message)
            {throw ex;}
            else {logger.warn("moved to dlq")}

    }}

    private boolean shouldRollback(Message mes) throws JMSException 
    { int rollbackcount = mes.getIntProperty("JMSXDeliveryCount"); 
    return (rollbackcount <= maxRollBackCountFromApplication.properties)
    }

    @Bean(name = "queuejmsfactory") public JmsListenerContainerFactory getQueueTopicFactory(ConnectionFactory con , 
    DefaultJmsListenerContainerFactoryConfigurer config) 
    { DefaultJmsListenerContainerFactory d = new DefaultJmsListenerContainerFactory(); 
    d.setSessionTransacted(true); 
    d.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); 
    config.configure(d,con); 
    return d; }

I have also tried to access the JMSXDeliveryCount from Headers, but couldnt get the exact object to access delivery count. Can you clarify plz.

@JmsListener(destination = "testqueue", containerFactory = "queuejmsfactory")
    public void consumer(Message message,
                               @Header(JmsHeaders.CORRELATION_ID) String correlationId,
                               @Header(name = "jms-header-not-exists") String nonExistingHeader,
                               @Headers Map<String, Object> headers,
                               MessageHeaders messageHeaders,
                               JmsMessageHeaderAccessor jmsMessageHeaderAccessor) {}
1

1 Answers

1
votes

You can add the Session as another parameter to the JmsListener method.