I can receive messages from my service bus subscription just fine, but when an exception occurs in my listener, it appears that ultimately a Disposition frame with state=Modified and undeliverableHere=true is sent to service bus. The docs for service bus say that it doesn't support the amqp Modified disposition.
The message ends up with a Deferred state in service bus, and I can't figure out how to nudge the message back into activity.
JMS Config:
@Bean
public ConnectionFactory jmsConnectionFactory(MessageStoreProperties properties) throws UnsupportedEncodingException {
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(properties.getUrlString());
connectionFactory.setClientID(clientId);
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setRedeliveryPolicy(redeliveryPolicy());
return new CachingConnectionFactory(connectionFactory);
}
@Bean
public JmsDefaultRedeliveryPolicy redeliveryPolicy() {
JmsDefaultRedeliveryPolicy policy = new JmsDefaultRedeliveryPolicy();
policy.setMaxRedeliveries(50);
return policy;
}
@Bean
public JmsListenerContainerFactory topicContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSubscriptionDurable(true);
factory.setPubSubDomain(true);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
Listener:
@JmsListener(destination = "crm-customer-event/subscriptions/test-sub", containerFactory = "topicContainerFactory")
public void receiveCustomerEvent(@Payload ExecutionContextDTO dto) {
logger.debug("Got payload: " + dto);
throw new RuntimeException("Oops");
}
Here's what I'm seeing in the logs:
Message transfer starts
[299309872:1] <- Transfer{handle=0, deliveryId=0, deliveryTag=\x97\x1c\x87\x04\xb7\xea\x86@\xb3n\xbd\x9fg\x81\x00\x11, messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=true} (16695) "\x00Sp\xc0\x0a\x05@@p.........
Exception is thrown, then JMS redelivers the message locally 50 times (why does it do this?)
The next amqp protocol frame I see after that is
[299309872:1] -> Disposition{role=RECEIVER, first=0, last=0, settled=true, state=Modified{deliveryFailed=true, undeliverableHere=true, messageAnnotations=null}, batchable=false}
It appears to me that this last Disposition frame causes the message to enter the Deferred state. There's also still a lock token on the message. Even when the TTL passes, the message is still stuck in the subscription and no amount of poking at it through the REST api helps. I've tried unlocking it (with PUT) and deleting it (with DELETE). I've also tried just receiving it with the REST api (both PeekLock and receive and delete varieties) and it just looks like they aren't there. I have the option set on the subscription to auto-move the messages to the dead letter queue after expiration, and they are never moved.
The code from qpid-jms that makes the ack happen is here, and it appears that this part of the library is not meant to be extended, otherwise I'd make my own implementation that returns a different ack.
How do I get qpid/JMS to
- Use the FAILED ack instead of the MODIFIED_FAILED_UNDELIVERABLE ack
- Clear out those Deferred messages.