3
votes

I want my app to handle multiple messages received from RabbitMQ concurrently. I have tried probably all the google-page-1 solutions but it won't work. Here is my setup:

POM.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.10.RELEASE</version>
</parent>
.
.
.
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

application.properties:

#############################
#         RabbitMQ          #
#############################
#AMQP RabbitMQ configuration 
spring.rabbitmq.host=zzzzzzzz
spring.rabbitmq.port=5672
spring.rabbitmq.username=zzzzzzz
spring.rabbitmq.password=zzzzzzz
#Rabbit component names
com.cp.neworder.queue.name = new-order-queue-stg
com.cp.neworder.queue.exchange = new-order-exchange-stg
com.cp.completedorder.queue.name = completed-order-queue
com.cp.completedorder.queue.exchange = completed-order-exchange
#Rabbit MQ concurrect consumers config
spring.rabbitmq.listener.simple.concurrency=3
spring.rabbitmq.listener.simple.retry.initial-interval=3000

Configuration file:

@Configuration
public class RabbitMQConfig {

    @Value("${com.cp.neworder.queue.name}")
    private String newOrderQueueName;
    @Value("${com.cp.neworder.queue.exchange}")
    private String newOrderExchangeName;

    @Bean
    Queue queue() {
        return new Queue(newOrderQueueName, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(newOrderExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(newOrderQueueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(newOrderQueueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(OrderMessageListener receiver) {
        return new MessageListenerAdapter(receiver, "receiveOrder");
    }

}

My consumer class works as intended, it just processes one request at a time. How do I know?

  1. I save the process of my async requests in DB, so I can query how many are processing at the moment, and it's always just 1.
  2. I can look at the RabbitMQ Management platform, and I see that it's being dequeued one by one.

What are the mistakes in my setup? How do I get it to work?

Thanks.

2

2 Answers

3
votes

SimpleMessageListenerContainer has a way to set the concurrent consumers. It has setConcurrentConsumers method where you can set the number of consumers.

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
      container.setConnectionFactory(connectionFactory);
      container.setQueueNames(newOrderQueueName);
      container.setMessageListener(listenerAdapter);
      container. setConcurrentConsumers(10);
      return container;
  }

With this configuration, when you start the application, you will be able to see multiple consumers in the RabbitMQ admin

2
votes

You are not using Boot to create the container so the boot properties aren't applied.

Try

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter,  
               RabbitProperties properties) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(newOrderQueueName);
    container.setMessageListener(listenerAdapter);

    container.setConcurrentConsumers(properties.getListener().getSimple().getConcurrency();

    return container;
}