11
votes

I have a Spring JMS Application that is using ActiveMQ version 5.10. I am performing a simple test to concurrency. I am using Spring Boot, current version and annotations to configure JMSListener and message producers.

The message producer just throws messsages on a queue as fast as it can. The message listener is pulling messages off the queue, but sleeping for 1 second after getting the message -- simulating some work that the message listener would need to do after getting a message.

I have the JMSListener set to 100-1000 concurrent threads. If I start the message producer and consumer at the same time (both run in their own JVM) the consumer never gets above the minimum configured threads, even though the max range is set 1000.

If I let the producer start first and place a few thousand messages on the queue, then start 1 or more instances of the consumer, it will raise the threads steadily, starting at 100 then 20 or so threads each second until it gets to a state where there is about 20-30 messages in the queue that are in-flight. It never catches the producer -- there is always some messages in queue even though the consumer is no where near its maxConcurrency count.

Why doesn't the message consumer burst into a bunch of additional threads to empty the queue instead of letting the queue have the 20-30 messages in it? Isn't there a way for the consumer continue to add threads faster in order to catch up with the messages in queue?

Here are the relevant parts of the code.

Message Producer

@Component
public class ClientServiceImpl implements ClientService {

    private static final String QUEUE="message.test.queue";

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void submitMessage(ImportantMessage importantMessage) {

        System.out.println("*** Sending " + importantMessage);
        jmsTemplate.convertAndSend(QUEUE, importantMessage);

    }
}

Message Consumer

@SpringBootApplication
@EnableJms
public class AmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(AmqConsumerApplication.class, args);
    }
    @Value("${JMSHost}")
    private String JMS_BROKER_URL;

    @Autowired
    static Command command;

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory factory= new ActiveMQConnectionFactory(JMS_BROKER_URL);
        ((ActiveMQConnectionFactory)factory).setTrustAllPackages(true);
        ((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
        ((ActiveMQConnectionFactory)factory).setAlwaysSessionAsync(false);
        return factory;
    }

}

With the listener configured as such...

@Component
public class TransformationListener {

    private static final String QUEUE="message.test.queue?consumer.prefetchSize=10";

    @JmsListener(destination=QUEUE, concurrency = "100-1000")
    public void handleRequest(ImportantMessage importantMessage) {
        System.out.println("*** Recieved message: " + importantMessage + " on thread" + Thread.currentThread().getId());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
1

1 Answers

1
votes

Are you still facing this behavior ? Did you read this advice "Pooled Consumers and prefetch" on http://activemq.apache.org/what-is-the-prefetch-limit-for.html Did you tried with prefetchSize=0 or 1 ? I think 1 can resolve your problem. If prefetchSize is > 1 maybe you need to decrease the AbortSlowAckConsumerStrategy to lower than default 30s. To have more than 100 threads consuming messages in your case you need more than 1000 messages not consumed and not prefetched in the queue because the prefetchSize is to 10.