I want my app to handle multiple messages received from RabbitMQ concurrently. I have tried probably all the google-page-1 solutions but it won't work. Here is my setup:
POM.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.10.RELEASE</version>
</parent>
.
.
.
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
application.properties:
#############################
# RabbitMQ #
#############################
#AMQP RabbitMQ configuration
spring.rabbitmq.host=zzzzzzzz
spring.rabbitmq.port=5672
spring.rabbitmq.username=zzzzzzz
spring.rabbitmq.password=zzzzzzz
#Rabbit component names
com.cp.neworder.queue.name = new-order-queue-stg
com.cp.neworder.queue.exchange = new-order-exchange-stg
com.cp.completedorder.queue.name = completed-order-queue
com.cp.completedorder.queue.exchange = completed-order-exchange
#Rabbit MQ concurrect consumers config
spring.rabbitmq.listener.simple.concurrency=3
spring.rabbitmq.listener.simple.retry.initial-interval=3000
Configuration file:
@Configuration
public class RabbitMQConfig {
@Value("${com.cp.neworder.queue.name}")
private String newOrderQueueName;
@Value("${com.cp.neworder.queue.exchange}")
private String newOrderExchangeName;
@Bean
Queue queue() {
return new Queue(newOrderQueueName, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(newOrderExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(newOrderQueueName);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(newOrderQueueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(OrderMessageListener receiver) {
return new MessageListenerAdapter(receiver, "receiveOrder");
}
}
My consumer class works as intended, it just processes one request at a time. How do I know?
- I save the process of my async requests in DB, so I can query how many are processing at the moment, and it's always just 1.
- I can look at the RabbitMQ Management platform, and I see that it's being dequeued one by one.
What are the mistakes in my setup? How do I get it to work?
Thanks.