0
votes
  • CentOS 7
  • 4 cores, 16G RAM, 500GB SSD
  • ActiveMQ 5.15.4
  • OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)

ActiveMQ architecture enter image description here

Machines 69 and 68 are built with network brokers. The activemq.xml of 68 and 69 are identical except for the relevant names such as broker names, hostname, IP, etc.

Topics in this architecture:

  • TOPIC_A_ALL_DELIVERED
  • TOPIC_A_ALL_RECEIVED
  • TOPIC_A_NN_DELIVERED, NN=00-23
  • TOPIC_A_NN_RECEIVED, NN=00-23

Persistence days: Current is 2 days, but the expected target is 7 days. This is how long to wait before expiring the message if a consumer doesn't connect back to retrieve it. I use this config: <timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>.

Persistent store: kahadb, limit size is 200GB

Message format is XML, and each message size is about 1K-3K bytes.

Consumers

In addition to durably subscribing to TOPIC_A_ALL_XXXXXX each consumer also durably subscribes to other topics based on its requirements, but I have no idea how fast the consumers can consume the data. Neither do I know whether they actually subscribe all topics the need. I only know sometime some consumers stop receiving data due to debug their code and then connect back.

Producer

The producer is scheduled to run every 30 minutes. Whenever the producer works it only puts data to one single MQ server. The target MQ server depends on the protocol of failover connection.

Every time the producer will put over 800K quantity XML messages to topics starting with TOPIC_A. The XML contains a tag sit_id (00-23) and tag direction (DELIVERED or RECEIVED) so the producer will put each XML to its relevant topic based on the tag site_id and direction in the XML. The producer meanwhile put each XML into TOPIC_A_ALL_XXXXX based on the tag direction in each XML.

Based on the above data the average total quantity of message for each day is about 76,800,000.

Symptom

At beginning no messages are in kahadb. The speed for producer to put queue to a single MQ with one connection is up to 600~700 msgs/sec. As the amount of data increases the speed sending the message slows down. It slows to 3 msgs/sec and sometimes gets stuck. Whenever this situation happens the below situation can be observed:

  1. From htop, one activemq process keeps consuming 100% of CPU (sometime up to 200%) enter image description here
  2. The free memory is normal (4~8GB at least)
  3. Consumers receive nothing from both MQ servers.

The above situations almost happens everyday.

If I just wait there for 4-6 hours the MQ server will come back. The producer can send at 4xx~5xx msgs/sec, and then consumers can receive data.

This cycle keeps everyday. I really have no idea how to improve this situation. Any suggestions?

activemq.xml

<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-mdmcs02p-node1" dataDirectory="${activemq.data}" persistent="true" useJmx="true" populateJMSXUserID="true">
        <destinations>

        
        <topic physicalName="TOPIC_A.00.Delivered" />
        <topic physicalName="TOPIC_A.00.Received" />
        <topic physicalName="TOPIC_A.01.Delivered" />
        <topic physicalName="TOPIC_A.01.Received" />
        <topic physicalName="TOPIC_A.02.Delivered" />
        <topic physicalName="TOPIC_A.02.Received" />
        <topic physicalName="TOPIC_A.03.Delivered" />
        <topic physicalName="TOPIC_A.03.Received" />
        <topic physicalName="TOPIC_A.04.Delivered" />
        <topic physicalName="TOPIC_A.04.Received" />
        <topic physicalName="TOPIC_A.05.Delivered" />
        <topic physicalName="TOPIC_A.05.Received" />
        <topic physicalName="TOPIC_A.06.Delivered" />
        <topic physicalName="TOPIC_A.06.Received" />
        <topic physicalName="TOPIC_A.07.Delivered" />
        <topic physicalName="TOPIC_A.07.Received" />
        <topic physicalName="TOPIC_A.08.Delivered" />
        <topic physicalName="TOPIC_A.08.Received" />
        <topic physicalName="TOPIC_A.09.Delivered" />
        <topic physicalName="TOPIC_A.09.Received" />
        <topic physicalName="TOPIC_A.10.Delivered" />
        <topic physicalName="TOPIC_A.10.Received" />
        <topic physicalName="TOPIC_A.11.Delivered" />
        <topic physicalName="TOPIC_A.11.Received" />
        <topic physicalName="TOPIC_A.12.Delivered" />
        <topic physicalName="TOPIC_A.12.Received" />
        <topic physicalName="TOPIC_A.13.Delivered" />
        <topic physicalName="TOPIC_A.13.Received" />
        <topic physicalName="TOPIC_A.14.Delivered" />
        <topic physicalName="TOPIC_A.14.Received" />
        <topic physicalName="TOPIC_A.15.Delivered" />
        <topic physicalName="TOPIC_A.15.Received" />
        <topic physicalName="TOPIC_A.16.Delivered" />
        <topic physicalName="TOPIC_A.16.Received" />
        <topic physicalName="TOPIC_A.17.Delivered" />
        <topic physicalName="TOPIC_A.17.Received" />
        <topic physicalName="TOPIC_A.18.Delivered" />
        <topic physicalName="TOPIC_A.18.Received" />
        <topic physicalName="TOPIC_A.19.Delivered" />
        <topic physicalName="TOPIC_A.19.Received" />
        <topic physicalName="TOPIC_A.20.Delivered" />
        <topic physicalName="TOPIC_A.20.Received" />
        <topic physicalName="TOPIC_A.21.Delivered" />
        <topic physicalName="TOPIC_A.21.Received" />
        <topic physicalName="TOPIC_A.22.Delivered" />
        <topic physicalName="TOPIC_A.22.Received" />
        <topic physicalName="TOPIC_A.23.Delivered" />
        <topic physicalName="TOPIC_A.23.Received" />
        <topic physicalName="TOPIC_A_ALL.Delivered" />
        <topic physicalName="TOPIC_A_ALL.Received" />
        
        </destinations>

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="false" memoryLimit="4096mb" enableAudit="false" expireMessagesPeriod="60000" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->

                    <deadLetterStrategy>
                        <sharedDeadLetterStrategy processExpired="false"/>
                    </deadLetterStrategy>
 
                    <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                    </pendingMessageLimitStrategy>

                    <networkBridgeFilterFactory> 
                        <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> 
                    </networkBridgeFilterFactory>

                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

    <!--
        <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                    <virtualTopic name=">" prefix="TOPIC_A.*" selectorAware="false"/>
                </virtualDestinations>
            </virtualDestinationInterceptor>
        </destinationInterceptors>
     -->

    <networkConnectors>
        <networkConnector uri="static:(tcp://192.168.11.68:61616)" userName="admin" password="XXXXX" dynamicOnly="true" prefetchSize="1" />
    </networkConnectors>

        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="true"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="/home/activemq/kahadb"
                    ignoreMissingJournalfiles="true"
                    checkForCorruptJournalFiles="true"
                    checksumJournalFiles="true"
                    enableJournalDiskSyncs="false"
             />
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="80 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="40 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
        <sslContext>
            <sslContext 
                keyStore="file:${activemq.base}conf/broker1.ks" keyStorePassword="P@ssw0rd"
             />
        </sslContext>
         -->

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576000"/>
            <transportConnector name="nio" uri="nio://0.0.0.0:61617?trace=true"/>
            <!-- <transportConnector name="ssl" uri="ssl://0.0.0.0:61618?trace=true&amp;transport.enabledProtocols=TLSv1.2"/> -->

            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.transformer=jms"/>
            <!-- <transportConnector name="amqp+ssl" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;transport.enabledProtocols=TLSv1.2"/> -->
            <!-- <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
            <!-- <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
            <!-- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

        <plugins>
            <!-- 86,400,000 ms = 1 day -->
            <timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>

            <jaasAuthenticationPlugin configuration="activemq" />

            <authorizationPlugin>
                <map>
                    <authorizationMap>
                        <authorizationEntries>
                            <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                            <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />

                            <authorizationEntry topic="TOPIC_A.>" read="mdmsusers" write="mdmsusers" />
                            <authorizationEntry topic="TOPIC_A.Delivered" read="pwsusers" />
                            <authorizationEntry topic="ActiveMQ.Advisory.>" read="mdmsusers,pwsusers" write="mdmsusers,pwsusers" admin="mdmsusers,pwsusers"/>
                        </authorizationEntries>
                        <tempDestinationAuthorizationEntry> 
                            <tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/> 
                        </tempDestinationAuthorizationEntry> 
                    </authorizationMap>
                </map>
            </authorizationPlugin>
        </plugins>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>

1
It's important to keep in mind that a message broker is not designed to store data like for example a database. It's designed to be an integration point through which large amounts of data flow. Therefore, generally speaking, producing and consuming should be balanced. The broker can deal with a relatively small and short build-up of messages if consumption slows down for a time, but it's critical for consumption to resume and even increase to eliminate the back-log of messages. If you just keep sending messages at the broker with no consumption you will encounter potentially big problems.Justin Bertram
@JustinBertram persistence days means how long the message will be expired if a consumer doesn't connect back to retrive it. I use below config: <timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>wureka
Have you considered that a message broker might not be the right fit for your application's use-case?Justin Bertram
@JustinBertram I will discuss with my customer for other solutions. By the way, do you think that ActiveMQ Artemis will have better improvement in the current use case ?wureka
ActiveMQ Artemis does have better performance and scalability than ActiveMQ 5.x, but it's still not designed to be a data store. What you really need is flow control on your producers, but I'm not sure if that fits your use-case. Flow-control will prevent the broker from being overwhelmed, but it will slow down or even stop your producers from actually sending messages until space is cleared on the broker.Justin Bertram

1 Answers

0
votes

I suspect you are hitting some sort of combination of flow control, fast producer, slow consumer, pending message limit or invalid client registration issue that needs to be sorted out. Perhaps even bug or optimization in that version of ActiveMQ.

Suggested steps:

  1. Upgrade to latest 5.15.x. There are a lot of fixes and it does not make sense to troubleshoot against a build that old.

  2. Enable Advisory Messages for advisoryForFastProducer and advisoryForSlowConsumer on the destinations. This will give you an ActiveMQ.Advisory topic to show when those scenarios occur.

  3. Investigate the client and connections to make sure they are all properly registered with clientId and suscriptionName to get a durable subscription. Remember: 2 connections cannot share clientId+subscriptioName

  4. Consider moving to Virtual Topics. This is where messages are sent to the topic and consumers read from queues. Much more flexibility and visibility over what is going on with flows. Also-- bonus it makes multi-broker shared subscriptions straight forward.