Setup
I have Spring Boot application called the Dispatcher. It runs on 1 Machine and has an embedded ActiveMQ Broker:
@Bean
public BrokerService broker(ActiveMQProperties properties) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector(properties.getBrokerUrl());
return broker;
}
which writes tasks to a JMS queue:
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(taskQueue())
.bridge(Bridges.blockingPoller(outboundTaskScheduler()))
.transform(outboundTransformer)
.handle(Jms.outboundAdapter(connectionFactory)
.extractPayload(false)
.destination(JmsQueueNames.STANDARD_TASKS))
.get();
}
@Bean
public QueueChannel standardTaskQueue() {
return MessageChannels.priority()
.comparator(TASK_PRIO_COMPARATOR)
.get();
}
// 2 more queues with different names but same config
The Worker Application runs on 10 Machines with 20 cores each and is configured like this:
@Bean
public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
int maxWorkers = 20;
return IntegrationFlows
.from(Jms.channel(connectionFactory)
.sessionTransacted(true)
.concurrentConsumers(maxWorkers)
.taskExecutor(
Executors.newFixedThreadPool(maxWorkers, new CustomizableThreadFactory("standard-"))
)
.destination(JmsQueueNames.STANDARD_TASKS))
.channel(ChannelNames.TASKS_INBOUND)
.get();
}
// 2 more inbound queues with different names but same config
This is repeated for a 2nd queue, plus 1 special case. So there is a total of 401 consumers.
Observation
Using JConsole, I can see that there are tasks in the ActiveMQ queue:
[TODO insert screenshot]
As expected, on any Worker machine, there are 20 consumer threads:
[TODO insert screenshot]
But most if not all of them are idle even though there are still messages in the queue. Using our monitoring tool, I see that about 50 to 400 tasks are being processed at any given time, when the expectation is a constant 400.
I also observed that Spring creates AbstractPollingMessageListenerContainer
for each consumer, which seem to result in 1 JMS connection being opened per application per queue per second (33 connections per second).
Investigation
So I found I do not receive messages in my second consumer which hints at prefetch
being the culprit. This sounded plausible, so I configured tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1
on each worker. Then, however, only about 25 tasks were being processed at any point which made no sense to me at all.
Question
I don't seem to understand what's going on and since I'm running out of time to investigate, I was hoping that anyone could point me in the right direction. Which factors could be the reason? The number of consumers/connections? The prefetch? Anything else?