0
votes

When processing the reply message with AsyncRabbitTemplate.sendAndReceive() or AsyncRabbitTemplate.convertSendAndReceive() method, since the reply message is returned asynchronously with calling method, we can use message listener for reply queue to receive and process reply message, why spring-amqp framework import AsyncRabbitTemplate and RabbiteMessageFuture to process the reply message? For message listener, we can control the related consumer thread, but for RabbitMessageFuture, the background thread can not be managed.

-------------------Added on 2017/01/06----------------------------

It's simply your choice.

Replies can come back in a different order to sends.

With the async template, the framework takes care of the correlation for you the reply will appear in the future returned by the send method.

When you use your own listener, you will have to take care of the correlation yourself.


Thank you. I know this difference.But there is still a problem. If I use message listener, I can ack the reply message manually(If my message listener implements ChannelAwareMessageListener interface and I can get the channel instance).But when I use asyncRabbitTemplate, can I ack the reply message manually? It seems that sendAndReceive method ack the reply message automatically.

I don't understand what you mean; since you can inject the listener container into the template, you have the same "control" either way.


It seems there is some problem in this mean.

I created a rabbitTemplate instance and simple message listener container. But when I use them to construct an asyncRabbitTemplate instance as following code:

@Bean(name="rabbitTemplate")
public RabbitTemplate getRabbitTemplate()
{
        RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory());
        rabbitTemplate.setUseTemporaryReplyQueues(false);        
        rabbitTemplate.setReplyAddress("replyQueue");
        rabbitTemplate.setReceiveTimeout(60000);
        rabbitTemplate.setReplyTimeout(60000);
        return rabbitTemplate;
}

@Bean(name="asyncRabbitTemplate")
public AsyncRabbitTemplate getAsyncRabbitTemplate()
{
  AsyncRabbitTemplate asyncRabbitTemplate =
    new AsyncRabbitTemplate(getRabbitTemplate(), createReplyListenerContainer());
  asyncRabbitTemplate.setAutoStartup(true);
  asyncRabbitTemplate.setReceiveTimeout(60000);
  return asyncRabbitTemplate;
}

@Bean(name="replyMessageListenerContainer")
public SimpleMessageListenerContainer createReplyListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(getConnectionFactory());
        listenerContainer.setQueueNames("replyQueue");      
        listenerContainer.setMessageListener(getRabbitTemplate());
        listenerContainer.setRabbitAdmin(getRabbitAdmin());
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return listenerContainer;
}

I found I can not send message successfully. The consumer server can not receive the message.

But when I create asyncRabbitTemplate instance with following code, I found the message can be sent and received successfully.

@Bean(name="asyncRabbitTemplate")
public AsyncRabbitTemplate getAsyncRabbitTemplate()
{
   AsyncRabbitTemplate asyncRabbitTemplate =
            new AsyncRabbitTemplate(getConnectionFactory(),
                        "sendMessageExchange",
                        "sendMessageKey",
                        "replyQueue");
  asyncRabbitTemplate.setReceiveTimeout(60000);
  asyncRabbitTemplate.setAutoStartup(true);
  return asyncRabbitTemplate;
}

If there is something wrong with my source code?

I used the spring-boot-ampq 1.4.3.RELEASE.

1

1 Answers

0
votes

It's simply your choice.

Replies can come back in a different order to sends.

With the async template, the framework takes care of the correlation for you - the reply will appear in the future returned by the send method.

When you use your own listener, you will have to take care of the correlation yourself.

For message listener, we can control the related consumer thread, but for RabbitMessageFuture, the background thread can not be managed.

I don't understand what you mean; since you can inject the listener container into the template, you have the same "control" either way.

EDIT

@SpringBootApplication
public class So41481046Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So41481046Application.class, args);
        AsyncRabbitTemplate asyncTemplate = context.getBean(AsyncRabbitTemplate.class);
        RabbitConverterFuture<String> future = asyncTemplate.convertSendAndReceive("foo");
        try {
            String out = future.get(10, TimeUnit.SECONDS);
            System.out.println(out);
        }
        finally {
            context.close();
        }
        System.exit(0);
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
        rabbitTemplate.setRoutingKey(queue().getName());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer(connectionFactory));
    }

    @Bean
    public Queue queue() {
        return new AnonymousQueue();
    }

    @Bean
    public Queue replyQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(replyQueue().getName());
        return container;
    }

    @Bean
    public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(queue().getName());
        container.setMessageListener(new MessageListenerAdapter(new Object() {

            @SuppressWarnings("unused")
            public String handleMessage(String in) {
                return in.toUpperCase();
            }

        }));
        return container;
    }

}