- CentOS 7
- 4 cores, 16G RAM, 500GB SSD
- ActiveMQ 5.15.4
- OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.10+9)
Machines 69 and 68 are built with network brokers. The activemq.xml
of 68 and 69 are identical except for the relevant names such as broker names, hostname, IP, etc.
Topics in this architecture:
TOPIC_A_ALL_DELIVERED
TOPIC_A_ALL_RECEIVED
TOPIC_A_NN_DELIVERED
, NN=00-23TOPIC_A_NN_RECEIVED
, NN=00-23
Persistence days: Current is 2 days, but the expected target is 7 days. This is how long to wait before expiring the message if a consumer doesn't connect back to retrieve it. I use this config: <timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>
.
Persistent store: kahadb, limit size is 200GB
Message format is XML, and each message size is about 1K-3K bytes.
Consumers
In addition to durably subscribing to TOPIC_A_ALL_XXXXXX
each consumer also durably subscribes to other topics based on its requirements, but I have no idea how fast the consumers can consume the data. Neither do I know whether they actually subscribe all topics the need. I only know sometime some consumers stop receiving data due to debug their code and then connect back.
Producer
The producer is scheduled to run every 30 minutes. Whenever the producer works it only puts data to one single MQ server. The target MQ server depends on the protocol of failover connection.
Every time the producer will put over 800K quantity XML messages to topics starting with TOPIC_A
. The XML contains a tag sit_id
(00-23) and tag direction
(DELIVERED
or RECEIVED
) so the producer will put each XML to its relevant topic based on the tag site_id
and direction
in the XML. The producer meanwhile put each XML into TOPIC_A_ALL_XXXXX
based on the tag direction
in each XML.
Based on the above data the average total quantity of message for each day is about 76,800,000.
Symptom
At beginning no messages are in kahadb. The speed for producer to put queue to a single MQ with one connection is up to 600~700 msgs/sec. As the amount of data increases the speed sending the message slows down. It slows to 3 msgs/sec and sometimes gets stuck. Whenever this situation happens the below situation can be observed:
- From htop, one activemq process keeps consuming 100% of CPU (sometime up to 200%)
- The free memory is normal (4~8GB at least)
- Consumers receive nothing from both MQ servers.
The above situations almost happens everyday.
If I just wait there for 4-6 hours the MQ server will come back. The producer can send at 4xx~5xx msgs/sec, and then consumers can receive data.
This cycle keeps everyday. I really have no idea how to improve this situation. Any suggestions?
activemq.xml
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-mdmcs02p-node1" dataDirectory="${activemq.data}" persistent="true" useJmx="true" populateJMSXUserID="true">
<destinations>
<topic physicalName="TOPIC_A.00.Delivered" />
<topic physicalName="TOPIC_A.00.Received" />
<topic physicalName="TOPIC_A.01.Delivered" />
<topic physicalName="TOPIC_A.01.Received" />
<topic physicalName="TOPIC_A.02.Delivered" />
<topic physicalName="TOPIC_A.02.Received" />
<topic physicalName="TOPIC_A.03.Delivered" />
<topic physicalName="TOPIC_A.03.Received" />
<topic physicalName="TOPIC_A.04.Delivered" />
<topic physicalName="TOPIC_A.04.Received" />
<topic physicalName="TOPIC_A.05.Delivered" />
<topic physicalName="TOPIC_A.05.Received" />
<topic physicalName="TOPIC_A.06.Delivered" />
<topic physicalName="TOPIC_A.06.Received" />
<topic physicalName="TOPIC_A.07.Delivered" />
<topic physicalName="TOPIC_A.07.Received" />
<topic physicalName="TOPIC_A.08.Delivered" />
<topic physicalName="TOPIC_A.08.Received" />
<topic physicalName="TOPIC_A.09.Delivered" />
<topic physicalName="TOPIC_A.09.Received" />
<topic physicalName="TOPIC_A.10.Delivered" />
<topic physicalName="TOPIC_A.10.Received" />
<topic physicalName="TOPIC_A.11.Delivered" />
<topic physicalName="TOPIC_A.11.Received" />
<topic physicalName="TOPIC_A.12.Delivered" />
<topic physicalName="TOPIC_A.12.Received" />
<topic physicalName="TOPIC_A.13.Delivered" />
<topic physicalName="TOPIC_A.13.Received" />
<topic physicalName="TOPIC_A.14.Delivered" />
<topic physicalName="TOPIC_A.14.Received" />
<topic physicalName="TOPIC_A.15.Delivered" />
<topic physicalName="TOPIC_A.15.Received" />
<topic physicalName="TOPIC_A.16.Delivered" />
<topic physicalName="TOPIC_A.16.Received" />
<topic physicalName="TOPIC_A.17.Delivered" />
<topic physicalName="TOPIC_A.17.Received" />
<topic physicalName="TOPIC_A.18.Delivered" />
<topic physicalName="TOPIC_A.18.Received" />
<topic physicalName="TOPIC_A.19.Delivered" />
<topic physicalName="TOPIC_A.19.Received" />
<topic physicalName="TOPIC_A.20.Delivered" />
<topic physicalName="TOPIC_A.20.Received" />
<topic physicalName="TOPIC_A.21.Delivered" />
<topic physicalName="TOPIC_A.21.Received" />
<topic physicalName="TOPIC_A.22.Delivered" />
<topic physicalName="TOPIC_A.22.Received" />
<topic physicalName="TOPIC_A.23.Delivered" />
<topic physicalName="TOPIC_A.23.Received" />
<topic physicalName="TOPIC_A_ALL.Delivered" />
<topic physicalName="TOPIC_A_ALL.Received" />
</destinations>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="false" memoryLimit="4096mb" enableAudit="false" expireMessagesPeriod="60000" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false"/>
</deadLetterStrategy>
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="TOPIC_A.*" selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
-->
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.68:61616)" userName="admin" password="XXXXX" dynamicOnly="true" prefetchSize="1" />
</networkConnectors>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="true"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="/home/activemq/kahadb"
ignoreMissingJournalfiles="true"
checkForCorruptJournalFiles="true"
checksumJournalFiles="true"
enableJournalDiskSyncs="false"
/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="80 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="40 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
<sslContext>
<sslContext
keyStore="file:${activemq.base}conf/broker1.ks" keyStorePassword="P@ssw0rd"
/>
</sslContext>
-->
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=1048576000"/>
<transportConnector name="nio" uri="nio://0.0.0.0:61617?trace=true"/>
<!-- <transportConnector name="ssl" uri="ssl://0.0.0.0:61618?trace=true&transport.enabledProtocols=TLSv1.2"/> -->
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.transformer=jms"/>
<!-- <transportConnector name="amqp+ssl" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600&transport.enabledProtocols=TLSv1.2"/> -->
<!-- <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
<!-- <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
<!-- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
<plugins>
<!-- 86,400,000 ms = 1 day -->
<timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>
<jaasAuthenticationPlugin configuration="activemq" />
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="TOPIC_A.>" read="mdmsusers" write="mdmsusers" />
<authorizationEntry topic="TOPIC_A.Delivered" read="pwsusers" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="mdmsusers,pwsusers" write="mdmsusers,pwsusers" admin="mdmsusers,pwsusers"/>
</authorizationEntries>
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/>
</tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
<timeStampingBrokerPlugin ttlCeiling="172800000" zeroExpirationOverride="172800000"/>
– wureka