I have a Java application with a number of components communicating via JMS (ActiveMQ). Currently the application and the JMS Hub are on the same server although we eventually plan to split out the components for scalability. Currently we are having significant issues with performance, all seemingly around JMS, most notably, and the focus of this question is the amount of time it is taking to publish a message to a topic.
We have around 50 dynamically created topics used for communication between the components of the application. One component reads records from a table and processes them one at a time, the processing involves creating a JMS Object message and publishing it to one of the topics. This processing could not keep up with the rate at which records were being written to the source table ~23/sec, so we changed the processing to create the JMS Object message and add it to a queue. A new thread was created which read from this queue and published the message to the appropriate topic. Obviously this does not speed the processing up but it did allow us to see how far behind we were getting by looking at the size of the queue.
At the start of the day no messages are going through the whole system, this quickly ramps up from 1560000 (433/sec) messages through the hub in the first hour to 2100000 (582/sec) in the 3rd hour and then staying at that level. At the start of the first hour the message publishing from the component reading records from the database table keeps up however, by the end of that hour there are 2000 messages in the queue waiting to be sent and by the 3rd hour the queue has 9000 messages in it.
Below are the appropiate sections of the code which send the JMS messages, any advice on what we are doing wrong or how we can improve this performance are much appreciated. Looking at stats on the web JMS should be able to easily handle ~1000-2000 large messages/sec or ~10000 small messages/sec. Our messages are around 500 bytes each so I imagine sit somewhere in the middle of that scale.
Code for getting the publisher:
private JmsSessionPublisher getJmsSessionPublisher(String topicName) throws JMSException {
if (!this.topicPublishers.containsKey(topicName)) {
TopicSession pubSession = (ActiveMQTopicSession) topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = getTopic(topicName, pubSession);
// Create a JMS publisher and subscriber
TopicPublisher publisher = pubSession.createPublisher(topic);
this.topicPublishers.put(topicName, new JmsSessionPublisher(pubSession, publisher));
}
return this.topicPublishers.get(topicName);
}
Sending the message:
JmsSessionPublisher jmsSessionPublisher = getJmsSessionPublisher(topicName);
ObjectMessage objMessage = jmsSessionPublisher.getSession().createObjectMessage(messageObj);
objMessage.setJMSCorrelationID(correlationID);
objMessage.setJMSTimestamp(System.currentTimeMillis());
jmsSessionPublisher.getPublisher().publish(objMessage, false, 4, 0);
Code which adds messages to the queue:
List<EventQueue> events = eventQueueDao.getNonProcessedEvents();
for (EventQueue eventRow : events) {
IEvent event = eventRow.getEvent();
AbstractEventFactory.EventType eventType = AbstractEventFactory.EventType.valueOf(event.getEventType());
String topic = event.getTopicName() + topicSuffix;
EventMsgPayload eventMsg = AbstractEventFactory.getFactory(eventType).getEventMsgPayload(event);
synchronized (queue) {
queue.add(new QueueElement(eventRow.getEventId(), topic, eventMsg));
queue.notify();
}
}
Code in the thread removing items from the queue:
jmsSessionFactory.publishMessageToTopic(e.getTopic(), e.getEventMsg(), Integer.toString(e.getEventMsg().hashCode()));
publishMessageToTopic executes the 'Sending the message' code above.
Other JMS implementations are an option if the consensus is that ActiveMQ may not be the best option.
Thank you,
James