I use the RabbitTemplate send message with my own generated CorrelationData. I've received the correlationId in the cofirmCallBack, but I can't receive it on the consumer side.
I tested this problem with 2.0.3.RELEASE and 2.1.0.RELEASE and the results are consistent with the above description.
rabbitmq config
@Configuration
public class RabbitMQConfig {
@Value("${mq.rabbit.addresses}")
private String addresses;
@Value("${mq.rabbit.username}")
private String username;
@Value("${mq.rabbit.password}")
private String password;
@Value("${mq.rabbit.virtualHost}")
private String virtualHost;
@Value("${mq.rabbit.sessionCacheSize}")
private int sessionCacheSize;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);// addresses list of addresses with form "host[:port],..."
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
connectionFactory.setChannelCacheSize(sessionCacheSize);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(MessageConverter messageConverter) {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(messageConverter);
template.setMandatory(true);
template.setConfirmCallback(new ConfirmCallbackListener());
template.setReturnCallback(new ReturnCallBackListener());
return template;
}
@Bean
public MessageConverter messageConverter(ObjectMapper customMapper) {
return new Jackson2JsonMessageConverter(customMapper);
}
@Bean
public Queue testQueue() {
return new Queue("test-queue", true);
}
@Bean
public TopicExchange defaultExchange() {
return new TopicExchange("test-exchange", true, false);
}
@Bean
public Binding bindingExchangeCommon(Queue testQueue, TopicExchange defaultExchange) {
return BindingBuilder.bind(testQueue).to(defaultExchange).with("test");
}
@Bean
public SimpleMessageListenerContainer testMessageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("test-queue");
container.setExposeListenerChannel(true);
container.setPrefetchCount(250);
container.setMaxConcurrentConsumers(20);
container.setConcurrentConsumers(10);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new TestMessageListener());
return container;
}
}
confirm callback
public class ConfirmCallbackListener implements RabbitTemplate.ConfirmCallback {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ack) {
logger.info("send message ack failed: " + cause + " -> ID: " + String.valueOf(correlationData));
}else {
logger.info("send message ack success -> ID: " + String.valueOf(correlationData));
}
}
}
return callback
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback{
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("send message failed...");
}
}
message listener
public class TestMessageListener implements ChannelAwareMessageListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// Here: get CorrelationId is always null
logger.info("handle message: {} -> ID: {}" , new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getCorrelationId());
if(true) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
logger.info("listener ack message completed");
}else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
} catch (Exception e) {
logger.error("handle test message error", e);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
send msg
@RestController
@RequestMapping("/rabbitmq")
public class RabbitmqCtrl {
private AtomicLong atoId = new AtomicLong();
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping("sendMsg")
public String sendMsg(@RequestBody String content) {
Message message = new Message();
message.setId(String.valueOf(atoId.incrementAndGet()));
message.setContent(content);
rabbitTemplate.convertAndSend("test-exchange", "test", message, new CorrelationData(String.valueOf(atoId.get())));
return "success";
}
}
I try to set a CorrelationDataPostProcessor to the RabbitTemplate as follow:
template.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
@Override
public CorrelationData postProcess(Message message, CorrelationData correlationData) {
if(correlationData != null) {
message.getMessageProperties().setCorrelationId(correlationData.getId());
}
return correlationData;
}
});
In this way, I can get the CorelationID, but I think that since I have already set the ID when sending the message, I should not have to do it. Any other more reasonable explanation ?