If I use publisher confirms and returns callback on RabbitMQ, I never get the returned message. From Spring AMQP docs :
- Publish to an exchange but there is no matching destination queue.
- Publish to a non-existent exchange.
The first case is covered by publisher returns, as described in Publisher Confirms and Returns.
So I think I will get returned message if I publish to exist exchange, but not-exist queue. But the return callback never called.
Do I need to set something else?
I'm using RabbitMQ 3.8.0 and Spring Boot 2.2.1
application.yml
spring:
rabbitmq:
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
Producer
@Service
public class PublisherConfirmProducer {
private static final Logger log = LoggerFactory.getLogger(PublisherConfirmProducer.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void postConstruct() {
this.rabbitTemplate.setConfirmCallback((correlation, ack, reason) -> {
if (correlation != null) {
log.info("Received " + (ack ? " ack " : " nack ") + "for correlation: " + correlation);
}
});
this.rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("Returned: " + message + "\nreplyCode: " + replyCode + "\nreplyText: " + replyText
+ "\nexchange/rk: " + exchange + "/" + routingKey);
});
}
// Careful : will be silently dropped, since the exchange is exists, but no
// route to queue, but ack-ed. How to know that I publish to non-existing queue?
public void sendMessage_ValidExchange_InvalidQueue(DummyMessage message) {
CorrelationData correlationData = new CorrelationData("Correlation for message " + message.getContent());
this.rabbitTemplate.convertAndSend("x.test", "not-valid-routing-key", message, correlationData);
}
}
Main app
@SpringBootApplication
public class RabbitmqProducerTwoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(RabbitmqProducerTwoApplication.class, args);
}
@Autowired
private PublisherConfirmProducer producer;
@Override
public void run(String... args) throws Exception {
var dummyMessage_2 = new DummyMessage("Message 2", 2);
producer.sendMessage_ValidExchange_InvalidQueue(dummyMessage_2);
}
}
Log result
2019-11-29 04:45:23.796 INFO 8352 --- [ main] c.c.r.RabbitmqProducerTwoApplication : Starting RabbitmqProducerTwoApplication on timpamungkas with PID 8352 (D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two\bin\main started by USER in D:\workspace\eclipse\my-courses\rabbitmq-1.2\rabbitmq-producer-two)
2019-11-29 04:45:23.800 INFO 8352 --- [ main] c.c.r.RabbitmqProducerTwoApplication : No active profile set, falling back to default profiles: default
2019-11-29 04:45:24.952 INFO 8352 --- [ main] c.c.r.RabbitmqProducerTwoApplication : Started RabbitmqProducerTwoApplication in 1.696 seconds (JVM running for 3.539)
2019-11-29 04:45:24.990 INFO 8352 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-11-29 04:45:25.024 INFO 8352 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#599f571f:0/SimpleConnection@86733 [delegate=amqp://[email protected]:5672/, localPort= 50688]
2019-11-29 04:45:25.058 INFO 8352 --- [nectionFactory1] c.c.r.producer.PublisherConfirmProducer : Received ack for correlation: CorrelationData [id=Correlation for message Message 2]
Edited for Gary
RabbitMqConfig.java
@Configuration
public class RabbitmqConfig {
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(converter);
return template;
}
}