I'm using spring-amqp:2.1.6.RELEASE
I have a RabbitTemplate with a PublisherReturn callback.
- If I send a message to a routingKey which has no queues bound to it, then the return callback is called correctly. When this happens I want to send the message to an alternative routingKey. However, if I use the RabbitTemplate in the ReturnCallback it just hangs up. I don't see anything saying the message can/can't be sent, the RabbitTemplate doesn't return control to my ReturnCallback and I don't see any PublisherConfirm either.
- If I create a new RabbitTemplate (with the same CachingConnectionFactory) then it still behaves the same way. My call just hangs up.
- If I send a message to a routingKey which does have a queue bound to it, then the message correctly arrives at the queue. The ReturnCallback is not called in this scenario.
After some investigation, I've come to the conclusion that the rabbitTemplate and/or connection is blocked until the original message is completely processed.
If I create a second CachingConnectionFactory and RabbitTemplate, and use these in the PublisherReturn callback, then it seems to work fine.
So, here's the question: What is this the best way to send a message in a PublisherReturn callback using spring-amqp?
I have searched, but can't find anything that explains how you should do this.
Here are simplified details of what I have:
@Configuration
public class MyConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setPublisherReturns(true);
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ReturnCallbackForAlternative returnCallbackForAlternative) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallbackForAlternative);
// ... other settings left out for brevity
return rabbitTemplate;
}
@Bean
@Qualifier("connectionFactoryForUndeliverable")
public ConnectionFactory connectionFactoryForUndeliverable() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
// ... other settings left out for brevity
return connectionFactory;
}
@Bean
@Qualifier("rabbitTemplateForUndeliverable")
public RabbitTemplate rabbitTemplateForUndeliverable() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactoryForUndeliverable());
// ... other settings left out for brevity
return rabbitTemplate;
}
}
Then to send the message I'm using
@Autowired
@Qualifier("rabbitTemplate")
private RabbitTemplate rabbitTemplate;
public void send(Message message) {
rabbitTemplate.convertAndSend(
"exchange-name",
"primary-key",
message);
}
And the code in the ReturnCallback is
@Component
public class ReturnCallbackForAlternative implements RabbitTemplate.ReturnCallback {
@Autowired
@Qualifier("rabbitTemplateForUndeliverable")
private RabbitTemplate rabbitTemplate;
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
rabbitTemplate.convertAndSend(
"exchange-name",
"alternative-key",
message);
}
}
EDIT
Simplified example to reproduce the problem. To run it:
- Have RabbitMq running
- Have an exchange called foo bound to a queue called foo
- Run as spring boot app
You'll see the following output:
in returnCallback before message send
but you won't see:
in returnCallback after message send
If you comment out the connectionFactory.setPublisherConfirms(true);
it runs OK.
@SpringBootApplication
public class HangingApplication {
public static void main(String[] args) {
SpringApplication.run(HangingApplication.class, args);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setPublisherReturns(true);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("foo");
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("Confirm callback for main template. Ack=" + ack);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("in returnCallback before message send");
rabbitTemplate.send("foo", message);
System.out.println("in returnCallback after message send");
});
return rabbitTemplate;
}
@Bean
public ApplicationRunner runner(@Qualifier("rabbitTemplate") RabbitTemplate template) {
return args -> {
template.convertAndSend("BADKEY", "foo payload");
};
}
@RabbitListener(queues = "foo")
public void listen(String in) {
System.out.println("Message received on undeliverable queue : " + in);
}
}
Here's the build.gradle I used:
plugins {
id 'org.springframework.boot' version '2.1.5.RELEASE'
id 'java'
}
apply plugin: 'io.spring.dependency-management'
group 'pcoates'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.11
repositories {
mavenCentral()
}
dependencies {
compile 'org.springframework.boot:spring-boot-starter-amqp'
}