2
votes

I'm trying to transfer messages between 2 ActiveMQ brokers through Apache Camel and the trouble is that I can achive rate of transportation only about 135 messages per second. I want to increase that number. The situation is I have 2 ActiveMQ brokers on remote server. I want to take messages from queue on first broker and transfer these messages to several queues on second broker via Camel route.

this is how I establish connections:
CamelContext context = new DefaultCamelContext();
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.1**.6.195:62222");
            ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory("admin", "admin", "tcp://10.1**.6.195:
            PooledConnectionFactory pf1 = new PooledConnectionFactory(connectionFactory);
            pf1.setMaximumActiveSessionPerConnection(45);
            pf1.setMaxConnections(40);
            PooledConnectionFactory pf2 = new PooledConnectionFactory(connectionFactory2);
            pf2.setMaximumActiveSessionPerConnection(45);
            pf2.setMaxConnections(40);
            context.addComponent("broker1", JmsComponent.jmsComponentAutoAcknowledge(pf1));
            context.addComponent("broker2", JmsComponent.jmsComponentAutoAcknowledge(pf2));

my route:

context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                onException(SetParamsException.class)
                    .filter()
                    .method(new IsDisableFlowLoggingFilter(), "filter")
                    .process(new CreateErrorHandlerLogMessageProcessor())
                    .to("broker2:queue:ESB.EVENT.LOGGING");

                from(fromBroker+":queue:"+sourceQueue+"?maxConcurrentConsumers=500&concurrentConsumers=40&asyncConsumer=true")
                    .process(new SetParamsProcessor())
                        .to("seda:EVENT.LOGGING")
                        .to("seda:EVENT.TRANSACTION.LOGGING")
                        .to("seda:EVENT.MONITOR.LOG")
                        .to("xslt:file://transform.xsl")
                            .to("broker2:queue:testMQDestinationOLOLO?maxConcurrentConsumers=500&concurrentConsumers=20&asyncConsumer=true")
                from("seda:EVENT.LOGGING")
                    .filter()
                    .method(new IsDisableFlowLoggingFilter(), "filter")
                    .process(new CreateEventMessageProcessor())
                    .to("broker2:queue:EVENT.LOGGING");

                from("seda:EVENT.TRANSACTION.LOGGING")
                    .process(new CreateTransactionDetailsMessageProcessor())
                    .to("broker2:queue:EVENT.TRANSACTION.LOGGING");

                from("seda:EVENT.MONITOR.LOG")
                    .process(new CreateMonitoringMessageProcessor())
                    .to("broker2:queue:EVENT.MONITOR.LOG");
            }
        });
        context.start();

This configuration gives me ~135 messages per second. I think that because my consumers work successively instead of parallel. Can anyone around help me with rate increasing?

PS: btw, ping to remote server ~2ms

1

1 Answers

0
votes

That is quite a bit of logging going on, I would recommend cutting down on that. Otherwise, it looks like you are using xslt on the message which might be quite slow considering usual xslt speeds. I would also recommend posting your ActiveMQConnectionFactory configurations. I would strongly recommend running these on pools with larger numbers of sessions. I currently use both camel and activemq in production with very fast rates. Here is a sample factory you could model off of:

<bean id="myAmqBean" class="org.apache.activemq.camel.component.ActiveMQComponent" destroy-method="doStop">
    <property name="configuration">
        <bean class="org.apache.camel.component.jms.JmsConfiguration">
            <property name="concurrentConsumers" value="20" />
            <property name="maxConcurrentConsumers" value="20" />
            <property name="acceptMessagesWhileStopping" value="true" />
            <property name="connectionFactory">
                <bean class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
                    <property name="maxConnections" value="10" />
                    <property name="MaximumActiveSessionPerConnection" value="500" />
                    <property name="connectionFactory">
                        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                            <property name="brokerURL" value="${activemq1.brokerUrl}" />
                            <property name="userName" value="${activemq1.username}" />
                            <property name="password" value="${activemq1.password}" />
                            <property name="dispatchAsync" value="true" />
                            <property name="alwaysSessionAsync" value="true" />
                            <property name="useAsyncSend" value="true" />
                        </bean>
                    </property>
                </bean>
            </property>
        </bean>
    </property>