0
votes

I'm using ActiveMQ Artemis 2.16.0 as my broker and artemis-jms-client-2.16.0.jar as my JMS client. It feels that I'm losing few messages at random for the reasons unknown to me. I've investigated my Java code and found nothing unusual yet.

I have a method

@JmsListener(destination = "${myQueue}", containerFactory = "jmsListenerContainerFactory")
@Override
public void process(Message message) {
    try {
        processMessage(Message message);
    } catch (Exception ex) {
        LOG.error("Error[...]", ex);
        responseSender.send(otherQueue, message, ex);
    }
}

The processMessage(Message message) method looks like this:

public void processMessage(Message message) {
    try {
        byte[] request = message.getBody(byte[].class);
        [...]
        if (!condition) {
            throw new MyBusinessError("error happened");
        }
        [...]
    } finally {
        MDC.remove(ID);
    } 
}
@Bean(name = "jmsListenerContainerFactoryTest")
@Primary
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    factory.setSessionTransacted(true);
    factory.setConnectionFactory(cachingConnectionFactory());
    return factory;
}
public class MyBusinessException extends Exception {
    private int code;
    [...]
}

broker.xml:

<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>

      <persistence-enabled>true</persistence-enabled>
      
      <journal-type>NIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-device-block-size>4096</journal-device-block-size>

      <journal-file-size>10M</journal-file-size>
      
      <!--
       This value was determined through a calculation.
       Your system could perform 2,17 writes per millisecond
       on the current journal configuration.
       That translates as a sync write every 490000 nanoseconds.

       Note: If you specify 0 the system will perform writes directly to the disk.
             We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
      -->
      <journal-buffer-timeout>490000</journal-buffer-timeout>


      <!--
        When using ASYNCIO, this will determine the writing queue depth for libaio.
       -->
      <journal-max-io>1</journal-max-io>

      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
      <disk-scan-period>5000</disk-scan-period>

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->
      <max-disk-usage>90</max-disk-usage>

      <!-- should the broker detect dead locks and other issues -->
      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>

      
      <page-sync-timeout>460000</page-sync-timeout>

      <acceptors>
         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>

         <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>

         <!-- STOMP Acceptor. -->
         <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

         <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
         <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

      </acceptors>


      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq, admin"/>
         </security-setting>
      </security-settings>
      <connection-ttl-override>60000</connection-ttl-override>
      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
                     <!-- <config-delete-queues>FORCE</config-delete-queues>
                      <config-delete-addresses>FORCE</config-delete-addresses>-->
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
            <auto-delete-queues>false</auto-delete-queues>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
            <auto-delete-queues>false</auto-delete-queues>
         </address-setting>
      </address-settings>

      <addresses>
        <address name="MyQueue">
            <anycast>
               <queue name="MyQueue">
               </queue>
            </anycast>
         </address>
         <address name="MyOtherQueue">
            <anycast>
               <queue name="MyOtherQueue" />
            </anycast>
         </address>                                                 
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>

   </core>
</configuration>

If MyBusinessError(...) is thrown the idea is to catch the exception and send that very same message to myOtherQueue. If sending that message fails (i.e. exception happens) then it is redelivered a second time and so on up to 10 times and then to DLQ. In essence that is what I see most of the time, but at random moments in my logs I only see one try to redeliver the message and no message in DLQ and the receiving side is complaining about the absence of a message. It feels that the message went missing. I have looked in myOtherQueue with a magnifying glass so to speak using both Artemis Console and JmsToolbox, but I see nothing but an empty queue. I have no consumers on this queues.

The purpose is not to get a failing message to DLQ, but to that other queue (myOtherQueue) for later investigation. If it happens that the message cannot be delivered to that queue it gets placed on DLQ. That is how I've thought about it.

At the end of the day at random very few messages go missing, and that is what I'm trying to understand. How should I investigate Artemis and see if any message loss has happened? Where to start? What tools to use?

1
Thank you for your assistance. I've updated the questionmiroana
redelivery happens automatically (up to 10 times) after MyBusinessException is thrown (using jmsTemplate, SpringBoot)miroana
I certainly think your process method should catch any exception and log it, but sending the message to myOtherQueue is unnecessary. It should just re-throw the exception to trigger redelivery. This is exactly what redelivery and the DLQ is for - dealing with and inspecting messages that can't be delivered. Adding another step here needlessly complicates your application and adds another point of failure. Now you have 3 queues involved in a use-case where you only need 2.Justin Bertram

1 Answers

0
votes

I would start by putting a property in each message that will allow it to be uniquely identified and then logging that value so you can correlate client and broker logs later. If you're using JMS then you can use something like this:

String uuid = java.util.UUID.randomUUID().toString();
message.setStringProperty("UUID", uuid);
logger.info("Sending message with UUID: " + uuid);

Then of course you'll want to log this on the consumer as well, e.g.:

Message message = consumer.receive();
String uuid = message.getStringProperty("UUID");
logger.info("Received message with UUID: " + uuid);

On you the broker you should then activate audit logging or perhaps use the LoggingActiveMQServerPlugin.

Once you have all the logging in place you simply have to wait until you think you've lost a message and then go through the logs to find the ID of the message which was sent but not received. Once you know that then you can go through the broker logs to see if it was received by the broker properly, dispatched to the consumer, etc. That will help you narrow down where the issue lies.