I have a Spring JMS Application that is using ActiveMQ version 5.10. I am performing a simple test to concurrency. I am using Spring Boot, current version and annotations to configure JMSListener and message producers.
The message producer just throws messsages on a queue as fast as it can. The message listener is pulling messages off the queue, but sleeping for 1 second after getting the message -- simulating some work that the message listener would need to do after getting a message.
I have the JMSListener set to 100-1000 concurrent threads. If I start the message producer and consumer at the same time (both run in their own JVM) the consumer never gets above the minimum configured threads, even though the max range is set 1000.
If I let the producer start first and place a few thousand messages on the queue, then start 1 or more instances of the consumer, it will raise the threads steadily, starting at 100 then 20 or so threads each second until it gets to a state where there is about 20-30 messages in the queue that are in-flight. It never catches the producer -- there is always some messages in queue even though the consumer is no where near its maxConcurrency count.
Why doesn't the message consumer burst into a bunch of additional threads to empty the queue instead of letting the queue have the 20-30 messages in it? Isn't there a way for the consumer continue to add threads faster in order to catch up with the messages in queue?
Here are the relevant parts of the code.
Message Producer
@Component
public class ClientServiceImpl implements ClientService {
private static final String QUEUE="message.test.queue";
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void submitMessage(ImportantMessage importantMessage) {
System.out.println("*** Sending " + importantMessage);
jmsTemplate.convertAndSend(QUEUE, importantMessage);
}
}
Message Consumer
@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(AmqConsumerApplication.class, args);
}
@Value("${JMSHost}")
private String JMS_BROKER_URL;
@Autowired
static Command command;
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false);
return factory;
}
}
With the listener configured as such...
@Component
public class TransformationListener {
private static final String QUEUE="message.test.queue?consumer.prefetchSize=10";
@JmsListener(destination=QUEUE, concurrency = "100-1000")
public void handleRequest(ImportantMessage importantMessage) {
System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}