1
votes

I use the RabbitTemplate send message with my own generated CorrelationData. I've received the correlationId in the cofirmCallBack, but I can't receive it on the consumer side.

I tested this problem with 2.0.3.RELEASE and 2.1.0.RELEASE and the results are consistent with the above description.

rabbitmq config

@Configuration
public class RabbitMQConfig {

    @Value("${mq.rabbit.addresses}")
    private String addresses;

    @Value("${mq.rabbit.username}")
    private String username;

    @Value("${mq.rabbit.password}")
    private String password;

    @Value("${mq.rabbit.virtualHost}")
    private String virtualHost;

    @Value("${mq.rabbit.sessionCacheSize}")
    private int sessionCacheSize;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);// addresses list of addresses with form "host[:port],..."
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        connectionFactory.setChannelCacheSize(sessionCacheSize);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(MessageConverter messageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(messageConverter);
        template.setMandatory(true);
        template.setConfirmCallback(new ConfirmCallbackListener());
        template.setReturnCallback(new ReturnCallBackListener());
        return template;
    }

    @Bean
    public MessageConverter messageConverter(ObjectMapper customMapper) {
        return new Jackson2JsonMessageConverter(customMapper);
    }

    @Bean
    public Queue testQueue() {
        return new Queue("test-queue", true);
    }

    @Bean
    public TopicExchange defaultExchange() {
        return new TopicExchange("test-exchange", true, false);
    }

    @Bean
    public Binding bindingExchangeCommon(Queue testQueue, TopicExchange defaultExchange) {
        return BindingBuilder.bind(testQueue).to(defaultExchange).with("test");
    }

    @Bean
    public SimpleMessageListenerContainer testMessageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames("test-queue");
        container.setExposeListenerChannel(true);
        container.setPrefetchCount(250);
        container.setMaxConcurrentConsumers(20);
        container.setConcurrentConsumers(10);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new TestMessageListener());
        return container;
    }
}

confirm callback

public class ConfirmCallbackListener implements RabbitTemplate.ConfirmCallback {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(!ack) {
            logger.info("send message ack failed: " + cause + " -> ID: " + String.valueOf(correlationData));
        }else {
            logger.info("send message ack success -> ID: " + String.valueOf(correlationData));
        }
    }
}

return callback

public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback{

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.info("send message failed...");
    }

}

message listener

public class TestMessageListener implements ChannelAwareMessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            // Here: get CorrelationId is always null
            logger.info("handle message: {} -> ID: {}" , new String(message.getBody(), "UTF-8"), 
                message.getMessageProperties().getCorrelationId());
            if(true) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                logger.info("listener ack message completed");
            }else {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        } catch (Exception e) {
            logger.error("handle test message error", e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

}

send msg

@RestController
@RequestMapping("/rabbitmq")
public class RabbitmqCtrl {

    private AtomicLong atoId = new AtomicLong();

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostMapping("sendMsg")
    public String sendMsg(@RequestBody String content) {
        Message message = new Message();
        message.setId(String.valueOf(atoId.incrementAndGet()));
        message.setContent(content);
        rabbitTemplate.convertAndSend("test-exchange", "test", message, new CorrelationData(String.valueOf(atoId.get())));
        return "success";
    }
}

I try to set a CorrelationDataPostProcessor to the RabbitTemplate as follow:

template.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
            @Override
            public CorrelationData postProcess(Message message, CorrelationData correlationData) {
                if(correlationData != null) {
                    message.getMessageProperties().setCorrelationId(correlationData.getId());
                }
                return correlationData;
            }
        });

In this way, I can get the CorelationID, but I think that since I have already set the ID when sending the message, I should not have to do it. Any other more reasonable explanation ?

1

1 Answers

1
votes

The CorrelationData is not send over the network unless you say that explicitly with a custom MessagePostProcessor, like you did with your CorrelationDataPostProcessor. The default implementation is like this:

default Message postProcessMessage(Message message, Correlation correlation) {
    return postProcessMessage(message);
}

As you see a correlation is fully ignored.

So, to send a correlation to the consumer side we really have to supply a custom MessagePostProcessor and inject it into the RabbitTemplate.