0
votes

STOMP consumer architecture diagram

As demonstrated in the diagram, out of n messages published on the queue - /queue/msgs, the distribution among the consumers (STOMP-Consumer 1 and STOMP-Consumer 2) is uneven. I could observe, STOMP-Consumer 2 only received one message out of n messages.

Exact same STOMP headers are passed by both the consumers. Those are as follows -

STOMP CONNECT Headers

  • client-id: app

STOMP SUBSCRIBE Headers

  • durable-subscription-name: app-subscription
  • auto-delete: false
  • ack: client-individual
  • destination: /queue/msgs

Broker.xml

    <?xml version="1.0"?>
    <configuration
        xmlns="urn:activemq"
        xmlns:xi="http://www.w3.org/2001/XInclude"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
        <core
            xmlns="urn:activemq:core" xsi:schemaLocation="urn:activemq:core ">
            <name>activemq-558f6696fc-2kx8q</name>
            <persistence-enabled>true</persistence-enabled>
            <journal-type>ASYNCIO</journal-type>
            <paging-directory>data/paging</paging-directory>
            <bindings-directory>data/bindings</bindings-directory>
            <journal-directory>data/journal</journal-directory>
            <large-messages-directory>data/large-messages</large-messages-directory>
            <journal-datasync>true</journal-datasync>
            <journal-min-files>2</journal-min-files>
            <journal-pool-files>10</journal-pool-files>
            <journal-device-block-size>4096</journal-device-block-size>
            <journal-file-size>10M</journal-file-size>
            <journal-buffer-timeout>24000</journal-buffer-timeout>
            <journal-max-io>4096</journal-max-io>
            <disk-scan-period>5000</disk-scan-period>
            <max-disk-usage>90</max-disk-usage>
            <critical-analyzer>true</critical-analyzer>
            <critical-analyzer-timeout>120000</critical-analyzer-timeout>
            <critical-analyzer-check-period>60000</critical-analyzer-check-period>
            <critical-analyzer-policy>HALT</critical-analyzer-policy>
            <page-sync-timeout>368000</page-sync-timeout>
            <acceptors>
                <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
                <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
                <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
                <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
                <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
            </acceptors>
            <security-settings>
                <security-setting match="#">
                    <permission type="createNonDurableQueue" roles="amq"/>
                    <permission type="deleteNonDurableQueue" roles="amq"/>
                    <permission type="createDurableQueue" roles="amq"/>
                    <permission type="deleteDurableQueue" roles="amq"/>
                    <permission type="createAddress" roles="amq"/>
                    <permission type="deleteAddress" roles="amq"/>
                    <permission type="consume" roles="amq"/>
                    <permission type="browse" roles="amq"/>
                    <permission type="send" roles="amq"/>
                    <!-- we need this otherwise ./artemis data imp wouldn't work -->
                    <permission type="manage" roles="amq"/>
                </security-setting>
            </security-settings>
            <message-expiry-scan-period>30000</message-expiry-scan-period>
            <connection-ttl-override>60000</connection-ttl-override>
            <address-settings>
                <address-setting match="activemq.management#">
                    <dead-letter-address>DLQ</dead-letter-address>
                    <expiry-address>ExpiryQueue</expiry-address>
                    <redelivery-delay>0</redelivery-delay>
                    <!-- with -1 only the global-max-size is in use for limiting -->
                    <max-size-bytes>-1</max-size-bytes>
                    <message-counter-history-day-limit>10</message-counter-history-day-limit>
                    <address-full-policy>PAGE</address-full-policy>
                    <auto-create-queues>true</auto-create-queues>
                    <auto-create-addresses>true</auto-create-addresses>
                    <auto-create-jms-queues>true</auto-create-jms-queues>
                    <auto-create-jms-topics>true</auto-create-jms-topics>
                </address-setting>
                <address-setting match="/queue/#">
                    <default-address-routing-type>ANYCAST</default-address-routing-type>
                    <default-queue-routing-type>ANYCAST</default-queue-routing-type>
                </address-setting>
                <address-setting match="/topic/#">
                    <default-address-routing-type>MULTICAST</default-address-routing-type>
                    <default-queue-routing-type>MULTICAST</default-queue-routing-type>
                </address-setting>
                <address-setting match="#">
                    <dead-letter-address>DLQ</dead-letter-address>
                    <expiry-address>ExpiryQueue</expiry-address>
                    <auto-delete-queues>false</auto-delete-queues>
                    <auto-delete-jms-queues>false</auto-delete-jms-queues>
                    <auto-delete-jms-topics>false</auto-delete-jms-topics>
                    <auto-delete-addresses>false</auto-delete-addresses>
                    <max-size-bytes>-1</max-size-bytes>
                    <message-counter-history-day-limit>10</message-counter-history-day-limit>
                    <address-full-policy>PAGE</address-full-policy>
                    <auto-create-queues>true</auto-create-queues>
                    <auto-create-addresses>true</auto-create-addresses>
                    <auto-create-jms-queues>true</auto-create-jms-queues>
                    <auto-create-jms-topics>true</auto-create-jms-topics>
                    <redelivery-delay-multiplier>1</redelivery-delay-multiplier>
                    <redelivery-collision-avoidance-factor>0.15</redelivery-collision-avoidance-factor>
                    <max-redelivery-delay>50000</max-redelivery-delay>
                    <default-consumer-window-size>0</default-consumer-window-size>
                </address-setting>
            </address-settings>
            <addresses>
                <address name="DLQ">
                    <anycast>
                        <queue name="DLQ"/>
                    </anycast>
                </address>
                <address name="ExpiryQueue">
                    <anycast>
                        <queue name="ExpiryQueue"/>
                    </anycast>
                </address>
            </addresses>
            <wildcard-addresses>
                <routing-enabled>true</routing-enabled>
                <delimiter>/</delimiter>
                <any-words>#</any-words>
                <single-word>*</single-word>
            </wildcard-addresses>
        </core>
    </configuration>

The acceptor in use is the artemis acceptor with port 61616

1
@JustinBertram Kindly excuse me, it was a late-night post :). I wanted to write ANYCAST there.Arshal Jain
@JustinBertram I have added the complete broker.xmlArshal Jain
@JustinBertram I have added the requested details.Arshal Jain

1 Answers

1
votes

Use the acceptor URL parameter stompConsumerCredits. The default value for this is 10240 bytes which means that the broker will dispatch 10KB worth of messages to each client. If the first client processes these messages quickly enough then other consumers can starve. Assuming that your STOMP consumers are using stomp acceptor then you could try something like this:

<acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true;stompConsumerCredits=1</acceptor>

Keep in mind that dispatching messages to consumers in batches based on stompConsumerCredits is a performance optimization which prevents lots of network round-trips between the client and the broker to fetch messages. I recommend you test different values for stompConsumerCredits to find the optimum performance for your use-case as using a small value for 2 consumers may actually reduce overall message throughput vs. a large value for 1 consumer.

Also, it's worth noting that in ActiveMQ Artemis 2.16.0 (not yet released) you'll be able to use a custom header (i.e. consumer-window-size) on the STOMP SUBSCRIBE frame to tune this per client instead of having it set at the acceptor level for all consumers.