I am using a durable topic where producer is publishing events using below policy:
<bean id="jmsTemplateESB" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachedJmsConnectionFactory" />
<property name="defaultDestination" ref="activeMQTopic" />
<!-- Value = javax.jms.DeliveryMode.PERSISTENT -->
<property name="deliveryMode" value="2" />
<!-- Value = javax.jms.Session.AUTO_ACKNOWLEDGE -->
<property name="sessionAcknowledgeMode" value="1" />
<!-- Needs to be true for the deliveryMode to work -->
<property name="explicitQosEnabled" value="true" />
</bean>
I am using following settings for consumer:
public static void listenOnTopic(String topicName, MessageListener listener)
throws Exception
{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
Connection con = factory.createConnection();
con.setClientID("Consumer");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, listener.getClass().getName());
subscriber.setMessageListener(listener);
con.start();
}
using below listener
public class ActiveMQMessageListener implements MessageListener
{
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageListener.class);
@Autowired
@Qualifier("jmsEventOutPutChannel")
MessageChannel outputChannel;
@Override
public void onMessage(Message message) {
try {
BytesMessage bytesMessage= (BytesMessage) message;
byte[] data = new byte[(int)bytesMessage.getBodyLength()];
bytesMessage.readBytes(data);
org.springframework.integration.Message<byte[]> outputMessage = MessageBuilder.withPayload(data).build();
outputChannel.send(outputMessage);
} catch (JMSException e) {
e.printStackTrace();
LOG.error("Error while retrieving events from ActiveMQ ",e);
}
}
}
and following spring settings for output channel
<bean id="callerBlockPolicy" class="org.springframework.integration.util.CallerBlocksPolicy">
<constructor-arg type="long" value="10000"></constructor-arg>
</bean>
<bean id="jmsListnerTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="${CORE_POOL_SIZE}"></property>
<property name="maxPoolSize" value="${THREAD_POOL_SIZE_JMS_LISTENER}"></property>
<property name="queueCapacity" value="${QUEUE_SIZE_JMS_LISTENER}"></property>
<property name="rejectedExecutionHandler" ref="callerBlockPolicy"></property>
<property name="waitForTasksToCompleteOnShutdown" value="true"></property>
</bean>
<int:channel id="jmsEventOutPutChannel">
<int:dispatcher task-executor="jmsListnerTaskExecutor" />
</int:channel>
This consumer code is too slow for us not able to retrieve the message at a high rate from topic.
Actually, without the "jmsEventOutPutChannel" in picture I am getting around 9500 qps rate but with "jmsEventOutPutChannel" in picture ,we are getting very less rate around 150 qps.
Can anybody has any hint what I am doing wrong with this code?
Is my "jmsEventOutPutChannel" channel code will also affect the consuming rate from activeMQ ?