0
votes

I've noticed that when I run the RabbitMQ status command from the command line (rabbitmqctl status), all of the reported numbers are way out of whack with what I know to be reality. My reality is confirmed by what I see in the Management web portal.

Here's an output of the CLI status:

[{pid,26647},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.5.4"},
      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.5.4"},
      {webmachine,"webmachine","1.10.3-rmq3.5.4-gite9359c7"},
      {mochiweb,"MochiMedia Web Server","2.7.0-rmq3.5.4-git680dba8"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.5.4"},
      {rabbit,"RabbitMQ","3.5.4"},
      {os_mon,"CPO  CXC 138 46","2.3"},
      {inets,"INETS  CXC 138 49","5.10.4"},
      {mnesia,"MNESIA  CXC 138 12","4.12.4"},
      {amqp_client,"RabbitMQ AMQP Client","3.5.4"},
      {xmerl,"XML parser","1.3.7"},
      {sasl,"SASL  CXC 138 11","2.4.1"},
      {stdlib,"ERTS  CXC 138 10","2.3"},
      {kernel,"ERTS  CXC 138 10","3.1"}]},
 {os,{unix,linux}},
 {erlang_version,
     "Erlang/OTP 17 [erts-6.3] [source] [64-bit] [smp:4:4] [async-threads:64] [kernel-poll:true]\n"},
 {memory,
     [{total,45994136},
      {connection_readers,101856},
      {connection_writers,54800},
      {connection_channels,165968},
      {connection_other,373008},
      {queue_procs,175376},
      {queue_slave_procs,0},
      {plugins,437024},
      {other_proc,13385792},
      {mnesia,131904},
      {mgmt_db,484216},
      {msg_index,53112},
      {other_ets,1119384},
      {binary,3890640},
      {code,20097289},
      {atom,711569},
      {other_system,4812198}]},
 {alarms,[]},
 {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,787190579},
 {disk_free_limit,50000000},
 {disk_free,778919936},
 {file_descriptors,
     [{total_limit,924},
      {total_used,13},
      {sockets_limit,829},
      {sockets_used,11}]},
 {processes,[{limit,1048576},{used,350}]},
 {run_queue,0},
 {uptime,911}]

The number of readers, writers, channels, etc, basically every number is multiplied multiple-thousand-fold.

The numbers are see in the management portal (ss below) are correct. 10 total connections, each with two channels

actual number of connections

All of my queues are non-durable and I'm only sending non-persistent messages using fanout exchanges. As I understand it, this should mean nothing is ever persisted should something go wrong (which is fine for my needs).

I have noticed that whenever I spin up or down one of the modules that connects to the broker, the number of readers/writers goes up by ~17,000 on the command line, despite only going up/down 1 in the portal.

Here's my broker configuration code for reference:

private String endPoint;
private int port;
private String userName;
private String password;
private Exchange publisherExchange;
private ExchangeType publisherExchangeType;
private Map<Exchange, ExchangeType> subscriptionExchanges;

private Channel publishChannel;
private Channel subscriptionChannel;
private Consumer consumer;

private BrokerHandler(BrokerHandlerBuilder builder) throws ConnectException{
    this.endPoint = builder.endPoint;
    this.port = builder.port;
    this.userName = builder.userName;
    this.password = builder.password;
    this.publisherExchange = builder.publisherExchange;
    this.publisherExchangeType = builder.publisherExchangeType;
    this.subscriptionExchanges = builder.subscriptionExchanges;

    connect();
}

private void connect() throws ConnectException{
    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost(this.endPoint);
    factory.setPort(this.port);

    if(this.userName != null && this.password != null){
        factory.setUsername(this.userName);
        factory.setPassword(this.password);

        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(RMQConstants.RABBITMQ_MAX_RETRY_DELAY);

    }

    try {
        log.info("Registering with broker on topic " + this.publisherExchange.toString() + " on " + this.endPoint + "...");
        connection = factory.newConnection();
        publishChannel = connection.createChannel();
        subscriptionChannel = connection.createChannel();
        configureConsumer();

        publishChannel.exchangeDeclare(this.publisherExchange.toString(), this.publisherExchangeType.toString());

    } catch(Exception e){
        throw new ConnectException("Unable to connect to RabbitMQ broker.");
    }

    if(this.subscriptionExchanges.size() > 0){
        subscribe(this.subscriptionExchanges);
    }
}
/**
 * Allows callers to publish a message to the broker, which will be broadcast to all listeners using a FANOUT strategy
 * @throws ConnectException if the handler is not connected to the broker
 */
private void publishToBroker(String msg) throws ConnectException{
    try {
        publishChannel.basicPublish(this.publisherExchange.toString(), "", null, msg.getBytes());
    } 
    catch (IOException e) {
        log.error("Unable to write message to broker.", e);
    }
}

private void subscribe(Map<Exchange, ExchangeType> exchanges){
    try {
        String queueName = subscriptionChannel.queueDeclare().getQueue();

        exchanges.forEach((k,v) -> {
            try {
                subscriptionChannel.exchangeDeclare(k.toString(), v.toString());
                subscriptionChannel.queueBind(queueName, k.toString(), "");
            } catch (Exception e) {
                log.error("Error declaring exchanges for exchange: " + k.toString(), e);
            }
        });

        subscriptionChannel.basicConsume(queueName, true, consumer);    
    } 
    catch(Exception e){
        log.error("Error declaring a queue for subscription channel", e);
    }
}

private void configureConsumer(){
    consumer = new DefaultConsumer(subscriptionChannel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            handleMessage(message);
          }
        };
}

Clients use a builder pattern to instantiate a broker connection, at which point they specify their publishing exchange and any number of exchanges they wish to subscribe to. There's only 19 exchanges total in this system.

Messages are being published and received properly, but I've been getting reports that the broker is bogging down the servers. I'll be monitoring it more closely but I'd really like to be able to explain these wacky results from the status call. I have tried stopping the app and resetting then reconfiguring the broker, which brings the connection counts back to 0, but once modules start reconnecting the numbers start going back up.

Thanks for taking the time to look through this. Any advice would be greatly appreciated!

1

1 Answers

1
votes

The connection_readers, connection_writers, connection_channels, etc. that you report are in the memory section, and the units are bytes, not number of connections. This is different data entirely to the section of the Management UI that you report.

To get data through the CLI about the number of connections, use the rabbitmqctl list_connections command.

channels
    Number of channels using the connection.

See also list_exchanges, list_queues, and list_consumers.