2
votes

I am trying some edge cases in my software. So I have created a very simple test environment:

  • RabbitMQ server running on CentOS 7
  • Message consumer written against .NETCore 2.1 in C# running under CentOS 7
  • Message sender written against .NETCore 2.1 in C# running under CentOS 7

I send a simple text message every 5 seconds. The sender and receiver run on the same UNIX box, while the RabbitMQ server runs on a different machine in the network. So far so good. Now I stop my RabbitMQ server with systemctl stop rabbitmq-server.

I get errors on the sender and the receiver which was expected.

I restart the RabbitMQ server using systemctl start rabbitmq-server.

And now the fun starts! The sender can recover and continues to send messages but the consumer CANNOT recover and does not receive messages. They are accumulated on the RabbitMQ server!

Here are my log entries from the sender (which works as expected):

2019-01-22 21:18:25.628 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 10] Failed to connect to broker infraserver-tbws2, port 5672, vhost testvh
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable ---> System.AggregateException: One or more errors occurred. (Connection failed) ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed ---> System.Net.Internals.SocketExceptionFactory+ExtendedSocketException: Connection refused 172.16.63.239:5672
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
   at System.Net.Sockets.Socket.EndConnect(IAsyncResult asyncResult)
   at System.Net.Sockets.Socket.<>c.<ConnectAsync>b__272_0(IAsyncResult iar)
--- End of stack trace from previous location where exception was thrown ---
   at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
   at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
   at EasyNetQ.ConnectionFactoryWrapper.CreateConnection()
   at EasyNetQ.PersistentConnection.TryToConnect()
2019-01-22 21:18:25.632 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 10] Failed to connect to any Broker. Retrying in 00:00:05
2019-01-22 21:18:35.444 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 7.' | Num: 7 | Guid: 5345c7e4-61e6-4c79-8179-d4bef7864420'.
2019-01-22 21:18:40.452 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 8.' | Num: 8 | Guid: 3cd8635c-cdfa-45f3-8495-2acb0713d47b'.
2019-01-22 21:18:45.457 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 9.' | Num: 9 | Guid: 099462b8-cd66-40b9-ac10-89c3246819ec'.
2019-01-22 21:18:50.470 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 10.' | Num: 10 | Guid: c25139b2-8e45-4771-9544-830014382e0c'.
2019-01-22 21:18:55.515 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 11.' | Num: 11 | Guid: 90049d91-3805-4aaa-ac18-b61c09164afd'.
2019-01-22 21:19:00.526 +01:00 [INF] [] [ThreadId 1] Sucessfully sent Message 'Message: 'This is test message number 12.' | Num: 12 | Guid: 108ff318-6a34-4e64-94bd-dafa67aa6717'.

This shows the last error message and then one can see that EasyNetQ recovered and can deliver messages again.

The message consumer does NOT work! Here my log entries:

2019-01-22 21:18:25.623 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 12] Failed to connect to broker infraserver-tbws2, port 5672, vhost testvh
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable ---> System.AggregateException: One or more errors occurred. (Connection failed) ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed ---> System.Net.Internals.SocketExceptionFactory+ExtendedSocketException: Connection refused 172.16.63.239:5672
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
   at System.Net.Sockets.Socket.EndConnect(IAsyncResult asyncResult)
   at System.Net.Sockets.Socket.<>c.<ConnectAsync>b__272_0(IAsyncResult iar)
--- End of stack trace from previous location where exception was thrown ---
   at RabbitMQ.Client.TcpClientAdapter.ConnectAsync(String host, Int32 port)
   at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, Int32 millisecondsTimeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, Int32 timeout)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout, AddressFamily family)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, Int32 connectionTimeout, Int32 readTimeout, Int32 writeTimeout)
   at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(String clientProvidedName)
   at EasyNetQ.ConnectionFactoryWrapper.CreateConnection()
   at EasyNetQ.PersistentConnection.TryToConnect()
2019-01-22 21:18:25.625 +01:00 [ERR] [EasyNetQ.PersistentConnection] [ThreadId 12] Failed to connect to any Broker. Retrying in 00:00:05

Here it sits and waits forever! Looks like the thing is deadlocked in a way while the messages are accumulating on the RabbitMQ server:

enter image description here

When I stop my consumer application and restart it, the messages are picked up.

I connect in both applications (sender and consumer) with the following code:

private static IBus SetupRabbitMqConnection(string rabbitServer, string rabbitVHost, ushort rabbitPort, string rabbitUser, string rabbitPwd)
{
    Log.Logger.Debug($"Creating a connection to RabbitMQ server '{rabbitServer}' on port {rabbitPort.ToString()} " +
                     $"using the EasyNetQ library....");

    try
    {
        var connStr = $"host={rabbitServer}:{rabbitPort.ToString()};virtualHost={rabbitVHost};username={rabbitUser};" +
            $"password={rabbitPwd};publisherConfirms=true;timeout=30;prefetchcount=1;requestedHeartbeat=30";
        var msgBus = RabbitHutch.CreateBus(connStr, x => { });
        if (!msgBus.IsConnected)
        {
            var errMsg = $"Currently not connected to RabbitMQ server '{rabbitServer}'.";
            Log.Logger.Error(errMsg);
        }
        Log.Logger.Debug("Successfully connected to RabbitMQ server.");
        return msgBus;
    }
    catch (Exception ex)
    {
        Log.Logger.Error($"Error to establish a connection to RabbitMQ server '{rabbitServer}'. Error: {ex}");
        throw;
    }
}

The consumer program registers the listeners as follows:

var msgBus = SetupRabbitMqConnection(rabbitServer, vhost, rabbitPort, rabbitUser, rabbitPwd);
RegisterMsgSubscriptions(msgBus);

private static void RegisterMsgSubscriptions(IBus msgBus)
{
    Log.Logger.Debug("Starting to register RabbitMQ message subscriptions...");
    try
    {
        #region Queue declarations

        var advancedBus = msgBus.Advanced;
        var testQueueOne = new EasyNetQ.Topology.Queue(TestQueueOneName, true);

        Log.Logger.Debug("Finished declaring queues.");

        #endregion

        #region Message Queue Handler registrations

        advancedBus.Consume(testQueueOne, registration => registration
            .Add<RabbitMessage<TestTextMessageDto>>(MessageProcessor.ProcessRabbitTestMessage));

        Log.Logger.Debug("Finished registration of message handlers for several queues.");

        #endregion
    }
    catch (Exception ex)
    {
        Log.Logger.Error($"Error registering message handler. Error: {ex}");
    }
}

Any idea what might go wrong here?? In production we have more than 100 servers consuming messages. These servers are on premise spread all over the country. The RabbitMQ server is in the data center. So the consuming servers MUST recover if the connection gets lost, otherwise this is not usable!

1

1 Answers

1
votes

Just add subscription logic on connected event (It will be fired on every broker connected event).

advancedBus.Connected += (sender, args) => { 
   // subscribe logic
};

Of course You should handle not to call subscribe logice twice on any connect etc.