0
votes

Is there a way to cause stale connections to time out in ActiveMQ Artemis? I have a situation where the connections are accumulating and then I get the "newSocketStream(..) failed: Too many open files" error, which I think is due to the connections.

How should I diagnose this problem?

2021-01-28 01:20:39,492 WARN  [io.netty.channel.DefaultChannelPipeline] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.: io.netty.channel.unix.Errors$NativeIoException: accept(..) failed: Too many open files

2021-01-28 01:20:39,656 WARN  [io.netty.channel.DefaultChannelPipeline] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.: io.netty.channel.unix.Errors$NativeIoException: accept(..) failed: Too many open files

2021-01-28 01:20:39,937 ERROR [org.apache.activemq.artemis.core.client] AMQ214016: Failed to create netty connection: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.epoll.EpollSocketChannel
    at io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:310) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector.createConnection(NettyConnector.java:818) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector.createConnection(NettyConnector.java:785) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.openTransportConnection(ClientSessionFactoryImpl.java:1076) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.createTransportConnection(ClientSessionFactoryImpl.java:1125) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.establishNewConnection(ClientSessionFactoryImpl.java:1336) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.getConnection(ClientSessionFactoryImpl.java:931) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.getConnectionWithRetry(ClientSessionFactoryImpl.java:820) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.connect(ClientSessionFactoryImpl.java:252) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl.connect(ClientSessionFactoryImpl.java:268) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl$StaticConnector$Connector.tryConnect(ServerLocatorImpl.java:1813) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl$StaticConnector.connect(ServerLocatorImpl.java:1682) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl.connect(ServerLocatorImpl.java:536) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl.connect(ServerLocatorImpl.java:524) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl$4.run(ServerLocatorImpl.java:482) [artemis-core-client-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.14.0.jar:2.14.0]
    at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65) [artemis-commons-2.14.0.jar:2.14.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [java.base:]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [java.base:]
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [artemis-commons-2.14.0.jar:2.14.0]
Caused by: java.lang.reflect.InvocationTargetException
    at jdk.internal.reflect.GeneratedConstructorAccessor17.newInstance(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [java.base:]
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) [java.base:]
    at io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    ... 23 more
Caused by: io.netty.channel.ChannelException: io.netty.channel.unix.Errors$NativeIoException: newSocketStream(..) failed: Too many open files
    at io.netty.channel.unix.Socket.newSocketStream0(Socket.java:421) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.epoll.LinuxSocket.newSocketStream(LinuxSocket.java:319) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.epoll.LinuxSocket.newSocketStream(LinuxSocket.java:323) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    at io.netty.channel.epoll.EpollSocketChannel.<init>(EpollSocketChannel.java:45) [netty-all-4.1.48.Final.jar:4.1.48.Final]
    ... 27 more
Caused by: io.netty.channel.unix.Errors$NativeIoException: newSocketStream(..) failed: Too many open files

This problem looks similar: SocketException : TOO MANY OPEN FILES

As for my use case, I'm receiving orders from a website and processing them into an ERP, then transmitting status back to the website and other systems. Sending messages back to the website API is a bit slow, and near the time of the incident there was maybe 700 messages queued.

The website uses AMQP and my message routing is down with JMS.

Here is the ulimit for the user that runs the broker.

core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 63805
max locked memory       (kbytes, -l) 16384
max memory size         (kbytes, -m) unlimited
open files                      (-n) 1024
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 63805
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

My JVM memory setting: -Xms1024M -Xmx8G

And here is my broker.xml

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>
      <persistence-enabled>true</persistence-enabled>
      <journal-type>NIO</journal-type>
      <paging-directory>/nfs/amqprod/data/paging</paging-directory>
      <bindings-directory>/nfs/amqprod/data/bindings</bindings-directory>
      <journal-directory>/nfs/amqprod/data/journal</journal-directory>
      <large-messages-directory>/nfs/amqprod/data/large-messages</large-messages-directory>
      <journal-datasync>true</journal-datasync>
      <journal-min-files>2</journal-min-files>
      <journal-pool-files>10</journal-pool-files>
      <journal-device-block-size>4096</journal-device-block-size>
      <journal-file-size>10M</journal-file-size>
      <journal-buffer-timeout>2628000</journal-buffer-timeout>
      <journal-max-io>1</journal-max-io>
      <disk-scan-period>5000</disk-scan-period>
      <max-disk-usage>90</max-disk-usage>
      <critical-analyzer>true</critical-analyzer>
      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
      <critical-analyzer-policy>HALT</critical-analyzer-policy>
      <page-sync-timeout>2628000</page-sync-timeout>
      <jmx-management-enabled>true</jmx-management-enabled>
      <global-max-size>2G</global-max-size>

      <acceptors>

<!-- keystores will be found automatically if they are on the classpath -->
         <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:5500?sslEnabled=true;keyStorePath={path}/keystore.ks;keyStorePassword={pasword};protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE</acceptor>

         <!-- Acceptor for every supported protocol -->
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>


      </acceptors>

      <!-- HA -->
      <connectors>
        <connector name="artemis">tcp://{Primary IP}:61616</connector>
        <connector name="artemis-backup">tcp://{Secondary IP}:61616</connector>
      </connectors>

      <cluster-user>activemq</cluster-user>
      <cluster-password>{cluster password}</cluster-password>

      <ha-policy>
        <shared-store>
          <master>
            <failover-on-shutdown>true</failover-on-shutdown>
          </master>
        </shared-store>
      </ha-policy>

      <cluster-connections>
        <cluster-connection name="cluster-1">
          <connector-ref>artemis</connector-ref>
          <!--<discovery-group-ref discovery-group-name="discovery-group-1"/>-->
          <static-connectors>
            <connector-ref>artemis-backup</connector-ref>
          </static-connectors>
        </cluster-connection>
       </cluster-connections>
      <!-- HA -->

      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
          </security-setting>
          <security-setting match="SiteCore.#">
            <!--<permission type="createDurableQueue" roles="ecom"/>
            <permission type="deleteDurableQueue" roles="ecom"/>
            <permission type="createAddress" roles="ecom"/>-->
            <permission type="consume" roles="ecom,amq"/>
            <permission type="browse" roles="ecom,amq"/>
            <permission type="send" roles="ecom,amq"/>
         </security-setting>
         <security-setting match="eCommerce.#">
            <!--<permission type="createDurableQueue" roles="ecom"/>
            <permission type="deleteDurableQueue" roles="ecom"/>
            <permission type="createAddress" roles="ecom"/>-->
            <permission type="consume" roles="ecom,amq"/>
            <permission type="browse" roles="ecom,amq"/>
            <permission type="send" roles="ecom,amq"/>
        </security-setting>
    
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
      </address-settings>

<addresses>
  <address name="DLQ">
    <anycast>
      <queue name="DLQ" />
    </anycast>
  </address>
  <address name="ExpiryQueue">
    <anycast>
      <queue name="ExpiryQueue" />
    </anycast>
  </address>
  <address name="test1.test">
    <multicast>
      <queue name="test1.test.A">
        <filter string="JMSType='A'" />
      </queue>
      <queue name="test1.test.B" />
    </multicast>
  </address>
  <address name="SiteCore.test.address">
    <multicast>
      <queue name="SiteCore.test.queue" />
    </multicast>
  </address>
  <address name="AX.User.Import.Topic">
    <multicast>
      <queue name="AX.User.Import.Queue" />
    </multicast>
  </address>
  <address name="Boomi.API.eCommerce.SOF.Order.Audit.Queue">
    <anycast>
      <queue name="Boomi.API.eCommerce.SOF.Order.Audit.Queue" />
    </anycast>
  </address>
  <address name="eCommerce.Customer.Information.Topic">
    <anycast>
      <queue name="eCommerce.Customer.Information.Queue" />
    </anycast>
  </address>
  <address name="MAO.SOF.Item.Delete.Topic">
    <multicast>
      <queue name="MAO.SOF.Item.Delete.Queue" />
    </multicast>
  </address>
  <address name="MAO.SOF.Item.Import.Topic">
    <multicast>
      <queue name="MAO.SOF.Item.Import.Queue" />
    </multicast>
  </address>
  <address name="MAO.SOF.Location.Import.Topic">
    <multicast>
      <queue name="MAO.SOF.Location.Import.Queue" />
    </multicast>
  </address>
  <address name="MAO.SOF.Order.Fulfillment.Status.Topic">
    <multicast>
      <queue name="MAO.SOF.Order.Fulfillment.Status.Inventory.Reserve.Queue" />
      <queue name="MAO.SOF.Order.Fulfillment.Status.Sitecore.Queue" />
    </multicast>
  </address>
  <address name="MAO.SOF.User.Import.Topic">
    <multicast>
      <queue name="MAO.SOF.User.Import.Queue" />
    </multicast>
  </address>
  <address name="Marketing.NRM.New.Neighbor.Topic">
    <multicast>
      <queue name="Marketing.NRM.New.Neighbor.Responsys.Queue" />
    </multicast>
  </address>  
  <address name="Pet.Services.Event.Topic">
    <multicast>
      
      <queue name="Pet.Services.Event.Appointment.Booked.Queue">
        <filter string="JMSType='appointment-booked'" />
      </queue>
      <queue name="Pet.Services.Event.Appointment.Canceled.Queue">
        <filter string="JMSType='appointment-canceled'" />
      </queue>
      <queue name="Pet.Services.Event.Appointment.Rescheduled.Queue">
        <filter string="JMSType='appointment-rescheduled'" />
      </queue>
      <queue name="Pet.Services.Event.Client.Created.Queue">
        <filter string="JMSType='client-created'" /> 
      </queue>
      <queue name="Pet.Services.Event.Client.Deleted.Queue">
        <filter string="JMSType='client-deleted'" />
      </queue>
      <queue name="Pet.Services.Event.Client.Updated.Queue">
        <filter string="JMSType='client-updated'" />
      </queue>
    </multicast>
  </address>
  <address name="Process.Tracking.General.Topic">
    <multicast>
      <queue name="Process.Tracking.General.DB.Writer.Queue" />
    </multicast>
  </address>
  <address name="PSP.Utilities.Email.Send.Queue">
    <multicast>
      <queue name="PSP.Utilities.Email.Send.Queue" />
    </multicast>
  </address>
  <address name="SiteCore.Sales.Order.Submission.Topic">
    <multicast>
      <queue name="SiteCore.Sales.Order.Submission.Queue" />
    </multicast>
  </address>
  <!--<address name="SiteCore.SOF.Order.Fulfillment.Submission.Error.Queue">
    <anycast>
      <queue name="SiteCore.SOF.Order.Fulfillment.Submission.Error.Queue" />
    </anycast>
  </address>-->
  <address name="SiteCore.SOF.Order.Fulfillment.Submission.Topic">
    <multicast>
      <queue name="SiteCore.SOF.Order.Fulfillment.Submission.ActiveOmni.Queue" />
      <queue name="SiteCore.SOF.Order.Fulfillment.Submission.Inventory.Reserve.Queue" />
    </multicast>
  </address>
</addresses>

      <!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
      <broker-plugins>
         <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
            <property key="LOG_ALL_EVENTS" value="true"/>
            <property key="LOG_CONNECTION_EVENTS" value="true"/>
            <property key="LOG_SESSION_EVENTS" value="true"/>
            <property key="LOG_CONSUMER_EVENTS" value="true"/>
            <property key="LOG_DELIVERING_EVENTS" value="true"/>
            <property key="LOG_SENDING_EVENTS" value="true"/>
            <property key="LOG_INTERNAL_EVENTS" value="true"/>
         </broker-plugin>
      </broker-plugins>
      -->

   </core>
</configuration>

Here is the client who I think is causing me all the trouble.

using (new TimeMeasure("PSP.Commerce.Foundation.Common.Services.BoomiUserService:SendMessage"))
{
    _connection = await GetOrSetQueueConnection();
    var session = new Session(_connection);
    var sender = new SenderLink(session, typeof(T).Name, topicName);
    var serializedData = JsonConvert.SerializeObject(message, Formatting.None,
        new JsonSerializerSettings {NullValueHandling = NullValueHandling.Ignore});
    var serializedMessage = new Message(serializedData)
    {
        Properties = new Properties
        {
            CreationTime = DateTime.Now
        }
    };
    Log.Info($"Message with body {serializedData} sent to {topicName} during attempt {currentAttempt}/{maxNumberOfAttempts}", this);
    await sender.SendAsync(serializedMessage);
    await session.CloseAsync();
    await _connection.CloseAsync();  // This line was missing
    return true;
}
1
Hi Justin. I updated my post with more details. - BenW
It's one client on the AMQP side, and maybe a dozen JSM. I'm not sure exactly how these are implemented. I'll check the the AMQP side. The JSM side is from a cloud integration platform, Dell Boomi. TotalMessageCount is 5500, and rising quickly. It's only been running for a few hours. ConnectionCount is 1365, and also rising. This is why I suspect "stale" connections. - BenW
You didn't ask about this, but looking through your configuration your address definitions look odd. It's technically possible to have addresses with statically defined multicast queues, but it's not common. Typically multicast queues are created on an address dynamically by the broker in response to a client's subscription. Unless your clients are using FQQN to access these queues they're likely to just fill up with messages and cause problems for your broker. - Justin Bertram
I created the queues this was so the consumer could go down and not lose the messages. Could this cause problems? - BenW
If you're not actually consuming messages from these queues then yes it could cause significant problems. Are your clients using FQQN to get direct access to these statically created multicast queues? - Justin Bertram

1 Answers

0
votes

ActiveMQ Artemis already enforces a default connection timeout of 60 seconds for any AMQP client using an acceptor where amqpIdleTimeout is not set. See the documentation for more details on that. Therefore any "stale" connection should be removed in 60 seconds and you'll see log messages indicating that a connection was cleaned up.

It's worth noting that in lieu of network problems which interrupt connections the most common cause of stale connections is poorly written clients which do not manage their resources properly.

In general, I think a ulimit of 1024 for open files is quite low for a modern system. I recommend you raise this substantially.