some configurations here: non-durable consumer,non-persistent message,disabled flow control,default prefetch size,optimizeAcknowledge = true,asynsend = true, use jms to connect ActiveMQ
for example,
one producer、one consumer,
Producer————Topic————consumer
the producer send rate can reach 6k/s
but,in this case: one producer three consumer,
/——consumer
Producer——-Topic——-consumer
\——consumer
the producer send rate drop down to 4k/s
Here is my some of the key code:
sender class:
public class sender {
public Boolean durable=false;
public String clientID=null;
public Boolean transacted=false;
public int ackMode=Session.AUTO_ACKNOWLEDGE;
public int timeToLive=0;
public String queuename = "";
public int persistent = DeliveryMode.NON_PERSISTENT;
public Connection createConnection(String user,String pwd,String url) throws JMSException, Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
connectionFactory.setDispatchAsync(true);
//connectionFactory.setAlwaysSessionAsync(false);
Connection connection = connectionFactory.createConnection();
if (durable && clientID!=null) {
connection.setClientID(clientID);
}
connection.start();
return connection;
}
public Session createSession(Connection connection) throws Exception {
Session session = connection.createSession(transacted, ackMode);
return session;
}
public MessageProducer createProducer(Session session) throws JMSException {
Queue destination = session.createQueue(queuename);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(persistent);
if( timeToLive!=0 )
producer.setTimeToLive(timeToLive);
return producer;
}
public void onMessage(Message message) {
//process message
}
}
sendmain method:
public static void main(String[] args) throws JMSException, Exception {
// TODO Auto-generated method stub
sender s = new sender();
s.persistent = DeliveryMode.NON_PERSISTENT;
Connection c = s.createConnection("","","tcp://localhost:61616?jms.useAsyncSend=true");
Session sess = s.createSession(c);
Topic topic = sess.createTopic("topic.test");
MessageProducer mp = sess.createProducer(topic);
StringBuffer tmpsb=new StringBuffer();
for (int j=0;j<1024;j++)
{
tmpsb.append("0");
}
Message m = sess.createTextMessage(tmpsb.toString());
long pre=System.currentTimeMillis();
for (int i=0;i<10000;i++)
{
mp.send(m);
}
long post=System.currentTimeMillis();
mp.close();
System.out.println("sendtime:"+(post-pre));
System.out.println("sendrate:"+10000000/(float)(post-pre));
System.out.println("timenow:"+(post));
}
receiver class code:
public class receiver implements MessageListener
{
public int receivetimes=0;
public long pretime;
public void onMessage(Message msg)
{
//TextMessage tm = (TextMessage) msg;
try {
if (receivetimes==0)
{
pretime=System.currentTimeMillis();
}
receivetimes+=1;
if (receivetimes==10000)
{
long now=System.currentTimeMillis();
System.out.println("time:"+(now-pretime)+"\nrecive rate:"+9999999/(float)(now-pretime));
System.out.println("timenow:"+(now));
receivetimes=0;
}
} catch(Throwable t) {
t.printStackTrace();
}
}
}
receiver class code here has hide some methods,such as createConnection,createSession or something just like sender class does.
receiver main method:
public static void main(String[] args) throws JMSException, Exception {
// TODO Auto-generated method stub
receiver s = new receiver();
Connection c = s.createConnection("","","tcp://localhost:6151?jms.optimizeAcknowledge=true");
Session sess = s.createSession(c);
Topic destination = sess.createTopic("topic.test");
MessageConsumer consumer = sess.createConsumer(destination);
consumer.setMessageListener(new receiver());
}
Every consumer is in a standalone process. I ran three consumer and one producer then I got a result of bad performance. Does any one knows why I get this?