1
votes

We implemented a distributed request / response type architecture for a particular use case where we want to wait for the response. The JMS broker we use is ActiveMq and the code is wired together using Spring .

The issue we see is that it appears that if sending a bunch of requests to the same destination, any request that, say, takes a significant amount of time to complete, blocks the request messages that follow it. The SessionAwareMessageListener interface that the consumer uses only supports the onMessage() method. What is the best way to achieve parallelism here i.e. if a particular request takes a long time, the other messages in the queue should not be blocked?

There is this SO post but it doesn't answer my question. JMS: Can we get multiple messages from queue in OnMessage() withtout commit or rollback

Thanks

Relevant snippets of code (exception handling etc removed for brevity)

Producer

public class MyJmsProducer {

private ProcessingResponse sendMessage(final Serializable serializable) {
    //send JMS request and wait for response
    return jmsMessagingTemplate.convertSendAndReceive(destination, serializable, ProcessingResponse.class); //this operation seems to be blocking + sync
   }
}

And the listener (consumer)

public class MyJmsListener
    implements SessionAwareMessageListener<Message>, NotificationHandler<Task> {

@Override
public void onMessage(Message message, Session session)
        throws JMSException {
    ProcessingRequest processingRequest = (ProcessingRequest) ((ObjectMessage) message).getObject();

    // handle the request here (THIS COULD TAKE A WHILE)
    handleRequest(processingRequest);


    // done handling the request, now create a response message
    final ObjectMessage responseMessage = new ActiveMQObjectMessage();
    responseMessage.setJMSCorrelationID(message.getJMSCorrelationID());
    responseMessage.setObject(processingResponse);

    // Message sent back to the replyTo address of the income message.
    final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
    producer.send(responseMessage);

  }
}
1
sure you can increase concurrent consumer count, but if handleRequest method consumes much of system resources, in general average time per request may increase.user1516873
If that post is not solving your problem then you real problem is not clear, where exactly you are having issues, while you are sending reply to the broker i.e. producer.send(responseMessage);? Are you thinking that producer producing request and sending it to broker wait for getting it consumed by consumer and getting completed?hagrawal

1 Answers

2
votes

you can use ConcurrentConsumers of the DMLC to increase consumption speed of the messages and resolve slow consumer issue :

@Bean
public DefaultMessageListenerContainer dmlc() {
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
    dmlc.setMaxConcurrentConsumers(10);
    dmlc.setConcurrentConsumers(5);
    return dmlc;
}

You need to adapt the prefetchPolicy to Concurrent Consumers :

persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)

all messages are dispatched to the first connected consumer and when another one connects to the same destination he don't receive messages, so to change this behavior you need to set prefetchPolicy to a lower value than default. for example add this jms.prefetchPolicy.queuePrefetch=1 to the uri config in activemq.xml or set it on the client url like this

@Bean
public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
    return connectionFactory;
}

Large prefetch values are recommended for high performance with high message volumes. However, for lower message volumes, where each message takes a long time to process, the prefetch should be set to 1. This ensures that a consumer is only processing one message at a time. Specifying a prefetch limit of zero, however, will cause the consumer to poll for messages, one at a time, instead of the message being pushed to the consumer.

Take a look at http://activemq.apache.org/what-is-the-prefetch-limit-for.html

And

http://activemq.apache.org/destination-options.html