There is a class 'MyConsumer' which receives messages from a queue, and processes them. There are two requirements:
- If there is a message contains invalid content, MyConsumer should not acknowledge it, but can process later messages
- The unconsumed message will be deliver again when MyConsumer restarts
I tried with spring-jms, with the listener-container supports, but can't find a solution fits the first requirement.
My code:
<amq:queue id="destination" physicalName="org.springbyexample.jms.test"/>
<amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost:11111"/>
<bean id="jmsConsumerConnectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory"
p:targetConnectionFactory-ref="jmsFactory"/>
<bean id="jmsConsumerTemplate" class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="jmsConsumerConnectionFactory"
p:defaultDestination-ref="destination"/>
<bean id="jmsMessageListener" class="test.MyConsumer"/>
<bean id="errorHandler" class="test.MyErrorHandler"/>
<jms:listener-container container-type="default"
connection-factory="jmsConsumerConnectionFactory"
error-handler="errorHandler"
acknowledge="client">
<jms:listener destination="org.springbyexample.jms.test" ref="jmsMessageListener"/>
</jms:listener-container>
Class MyConsumer:
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("!!!!!!!!! get message: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
if (theNumberOfMessageIs(3)) {
throw new RuntimeException("something is wrong");
}
}
You may notice that the acknowledge in listener-container is client, actually it has 3 values:
- auto (default)
- client
- transacted
I tried all of them, but none fits my requirement. My test scenario:
- producer put 3 messages to queue
- start a thread to monitor the message count in queue, when the count changes, print it
- start consumer, it will receive messages from queue, and processes them
- wait a while, put another 3 messages to queue
For auto:
MyConsumer will acknowledge after receiving each message, no matter throwing exception or not
For client:
MyConsumer will acknowledge only if no exception thrown in onMessage. For the 3rd message, it throws exception, there will be a message in the queue unconsummed. But when it get the 4th message and doesn't throw exception, the 3rd message in queue will be disapeared
For transacted:
If exception thrown in MyConsumer, the message will not be acknowledged and be re-delivered several times. After that, the message is disappeared from queue
But none of them fit the requirement 1.
I wonder: if I need to look for other solution than Spring-jms, or my usage is not correct?