0
votes

I am using a durable topic where producer is publishing events using below policy:

   <bean id="jmsTemplateESB"   class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory"     ref="cachedJmsConnectionFactory" />
    <property name="defaultDestination" ref="activeMQTopic" />
    <!-- Value = javax.jms.DeliveryMode.PERSISTENT -->
    <property name="deliveryMode" value="2" />
    <!-- Value = javax.jms.Session.AUTO_ACKNOWLEDGE -->
    <property name="sessionAcknowledgeMode" value="1" />
    <!-- Needs to be true for the deliveryMode to work -->

    <property name="explicitQosEnabled" value="true" />
    </bean>

I am using following settings for consumer:

   public static void listenOnTopic(String topicName, MessageListener listener) 
   throws Exception 
   {
   ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
   Connection con = factory.createConnection();


   con.setClientID("Consumer");
   Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic(topicName);
   TopicSubscriber subscriber = session.createDurableSubscriber(topic, listener.getClass().getName());

   subscriber.setMessageListener(listener);

   con.start();
  }

using below listener

public class ActiveMQMessageListener implements MessageListener
{
 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageListener.class);

 @Autowired
 @Qualifier("jmsEventOutPutChannel")
 MessageChannel outputChannel;

 @Override
 public void onMessage(Message message) {
    try {   
        BytesMessage bytesMessage= (BytesMessage) message;
        byte[] data = new byte[(int)bytesMessage.getBodyLength()];
        bytesMessage.readBytes(data);
        org.springframework.integration.Message<byte[]> outputMessage = MessageBuilder.withPayload(data).build();
        outputChannel.send(outputMessage);
    } catch (JMSException e) {
        e.printStackTrace();
        LOG.error("Error while retrieving events from ActiveMQ ",e);
    }
 }
}

and following spring settings for output channel

    <bean id="callerBlockPolicy" class="org.springframework.integration.util.CallerBlocksPolicy">
    <constructor-arg type="long" value="10000"></constructor-arg>
</bean>

<bean id="jmsListnerTaskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="${CORE_POOL_SIZE}"></property>
    <property name="maxPoolSize" value="${THREAD_POOL_SIZE_JMS_LISTENER}"></property>
    <property name="queueCapacity" value="${QUEUE_SIZE_JMS_LISTENER}"></property>
    <property name="rejectedExecutionHandler" ref="callerBlockPolicy"></property>
    <property name="waitForTasksToCompleteOnShutdown" value="true"></property>
</bean>

<int:channel id="jmsEventOutPutChannel">
    <int:dispatcher task-executor="jmsListnerTaskExecutor" />
</int:channel>

This consumer code is too slow for us not able to retrieve the message at a high rate from topic.

Actually, without the "jmsEventOutPutChannel" in picture I am getting around 9500 qps rate but with "jmsEventOutPutChannel" in picture ,we are getting very less rate around 150 qps.

Can anybody has any hint what I am doing wrong with this code?

Is my "jmsEventOutPutChannel" channel code will also affect the consuming rate from activeMQ ?

1

1 Answers

0
votes

It's really not your consumer code that's the problem, but that something's wrong when sending the message to the output channel.

Focus there and see why messages take so long to write to ActiveMQ. First I'd try making it non-persistent (but still durable) and see if that behaves differently. It could be that the ActiveMQ server is configured incorrectly and writing to a back-end store is inefficient (perhaps the Kahadb can't keep up?)

Is it possible that the producer is creating a connection for each message sent, and the overhead is killing you?

You might post your ActiveMQ URL, don't know what paremeters you've appended that might make a difference. But seeing it degraded this much is obviously bad.