1
votes

I have some problem with NetMQ 4.0.0.1 on Mono 4.8 on Debian Wheezy.

Where Dealer socket is not sending any message until I won't stop calling it to send new message. When I will put Thread.Sleep( 1000 ) between creating a tasks with than everything is ok. I would like to admit that everything is working on Windows in .Net Framework 4.5 and on .Net Core 1.1 without any Thread.Sleep().

I have pattern like this: enter image description here

I have added debug messages and I can see that I am creating 100 REQ sockets in Tasks in a loop, and Router is getting requests in a queue, than is sending them trough Dealer, and nothing is happening on the other side of TCP until I will stop call send on REQ sockets. A simple Thread.Sleep() on every 5 tasks is working. It looks like a Poller bug, or Dealer bug, or I am making something wrong.

Here is a code of middle box:

public class CollectorDevice : IDisposable
{
    private NetMQPoller _poller;
    private RouterSocket _frontendSocket;
    private DealerSocket _backendSocket;
    private readonly string _backEndAddress;
    private readonly string _frontEndAddress;
    private readonly int _expectedFrameCount;
    private readonly ManualResetEvent _startSemaphore = new ManualResetEvent(false);
    private readonly Thread _localThread;
    private static Logger _logger = LogManager.GetCurrentClassLogger();

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="backEndAddress"></param>
    /// <param name="frontEndAddress"></param>
    /// <param name="expectedFrameCount"></param>
    public CollectorDevice(string backEndAddress, string frontEndAddress, int expectedFrameCount)
    {
        _expectedFrameCount = expectedFrameCount;

        _backEndAddress = backEndAddress;
        _frontEndAddress = frontEndAddress;

        _localThread = new Thread(DoWork) { Name = "IPC Collector Device Thread" };
    }

    public void Start()
    {
        _localThread.Start();
        _startSemaphore.WaitOne();


    }

    public void Stop()
    {
        _poller.Stop();
    }

    #region Implementation of IDisposable

    public void Dispose()
    {
        Stop();
    }

    #endregion


    #region Private Methods
    private void DoWork()
    {
        try
        {
            using (_poller = new NetMQPoller())
            using (_frontendSocket = new RouterSocket(_frontEndAddress))
            using (_backendSocket = new DealerSocket(_backEndAddress))
            {
                _backendSocket.ReceiveReady += OnBackEndReady;
                _frontendSocket.ReceiveReady += OnFrontEndReady;


                _poller.Add(_frontendSocket);
                _poller.Add(_backendSocket);

                _startSemaphore.Set();

                _poller.Run();
            }
        }
        catch (Exception e)
        {
            _logger.Error(e);
        }
    }

    private void OnBackEndReady(object sender, NetMQSocketEventArgs e)
    {
        NetMQMessage message = _backendSocket.ReceiveMultipartMessage(_expectedFrameCount);
        _frontendSocket.SendMultipartMessage(message);
    }

    private void OnFrontEndReady(object sender, NetMQSocketEventArgs e)
    {
        NetMQMessage message = _frontendSocket.ReceiveMultipartMessage(_expectedFrameCount);
        _backendSocket.SendMultipartMessage(message);
    }

    #endregion
}

Here is a client side:

class Program
{
    private static Logger _logger = LogManager.GetCurrentClassLogger();


    private static void Main(string[] args)
    {
        Console.WriteLine("Client. Please enter message for server. Enter 'QUIT' to turn off server");
        Console.ReadKey();


        using (var collectorDevice = new CollectorDevice(">tcp://localhost:5556", "inproc://broker", 3))
        {
            collectorDevice.Start();

            List<Task> tasks = new List<Task>();
            for (int i = 0; i < 100; i++)
            {
                Console.WriteLine(i);
                int j = i;       
                Task t = Task.Factory.StartNew(() =>
                {
                    try
                    {
                        using (var req = new RequestSocket("inproc://broker"))
                        {
                            req.SendFrame(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
                            _logger.Debug(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));
                            Console.WriteLine(String.Format("Request client: {0} id: {1}", j, Task.CurrentId));

                            string responseMessage = req.ReceiveFrameString();
                            _logger.Debug(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
                            Console.WriteLine(String.Format("Response from server: {0} id: {1} message: {2}", j, Task.CurrentId, responseMessage));
                        }
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(e);
                        _logger.Error(e);
                    }
                });
                tasks.Add(t);
                //Thread.Sleep (100);//<- This thread sleep is fixing problem?
            }

            Task.WaitAll(tasks.ToArray());
        }

    }
}

And server side:

class Program
{
    private static Logger _logger = LogManager.GetCurrentClassLogger();

    static void Main(string[] args)
    {
        try{
        using (var routerSocket = new RouterSocket("@tcp://*:5556"))
        {
            var poller = new NetMQPoller();
            routerSocket.ReceiveReady += RouterSocketOnReceiveReady;
            poller.Add(routerSocket);
            poller.Run();
        }
        }
        catch(Exception e) 
        {      
            Console.WriteLine (e);
        }

        Console.ReadKey ();
    }

    private static void RouterSocketOnReceiveReady(object sender, NetMQSocketEventArgs netMqSocketEventArgs)
    {
        NetMQMessage clientMessage = new NetMQMessage();
        bool result = netMqSocketEventArgs.Socket.TryReceiveMultipartMessage(new TimeSpan(0, 0, 0, 5),
            ref clientMessage, 5);

        if (result == false)
        {
            Console.WriteLine ("Something went wrong?!");
        }

        var address = clientMessage[0];
        var address2 = clientMessage[1];
        var clientMessageString = clientMessage[3].ConvertToString();

        //_logger.Debug("Message from client received: '{0}'", clientMessageString);
        Console.WriteLine (String.Format ("Message from client received: '{0}'", clientMessageString));

        netMqSocketEventArgs
            .Socket.SendMoreFrame(address.Buffer)
            .SendMoreFrame(address2.Buffer)
            .SendMoreFrameEmpty()
            .SendFrame("I have received your message");
    }
}

Anybody has any idea?

I was thinking that I am maybe using socket from different threads, so I have packed it into ThreadLocal structure, but it wasnt helped.Than I have read about some problems in unity with NetMQ so I have added 'AsyncIO.ForceDotNet.Force();' before every socket constructor call, and this wasnt helped too. Than I have updated my mono to 4.8 from 4.4 and it still looks the same.

I have checked that Thread.Sleep(100) between tasks is fixing problem. It is strange

1
Any chance you are using any socket from multiple threads?somdoron
Also, can you share the code of the request socket?somdoron
I have put socket into ThreadLocal structure to be sure that I am not making that mistake. I have added more code like You want. By the way, thanks for fast responsebzyku
I'm looking into this, just wand to figure out, if on the server side you block for ever (instead of 5 seconds timeout), do you get the messages eventually?somdoron
I have changed TryReceiveMultipartMessage in Server to ReceiveMultipartMesssage, and still the same. Only Thread.Sleep(100) between Tasks is fixing problembzyku

1 Answers

0
votes

I tested the code, it does take a lot of time but eventually server receives all messages (takes around a minute).

The problem is the amount of threads, all async operation which should be completed on io completion ports thread takes a lot of time when there are 100 threads. I was able to reproduce it without NetMQ with following code

    public static void Main(string[] args)
    {
        ManualResetEvent resetEvent = new ManualResetEvent(false);

        List<Task> tasks = new List<Task>();

        for (int i = 0; i < 100; i++)
        {
            tasks.Add(Task.Run(() =>
            {
                resetEvent.WaitOne();
            }));
        }

        Thread.Sleep(100);

        Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        listener.Bind(new IPEndPoint(IPAddress.Any, 5556));
        listener.Listen(1);

        SocketAsyncEventArgs args1 = new SocketAsyncEventArgs();
        args1.Completed += (sender, eventArgs) =>
        {
            Console.WriteLine($"Accepted {args1.SocketError}");
            resetEvent.Set();
        };
        listener.AcceptAsync(args1);

        SocketAsyncEventArgs args2 = new SocketAsyncEventArgs();
        args2.RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, 5556);
        args2.Completed += (sender, eventArgs) => Console.WriteLine("Connected");
        Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        client.ConnectAsync(args2);

        Task.WaitAll(tasks.ToArray());

        Console.WriteLine("all tasks completed");
    }

You can see that is also takes around a minute. With only 5 threads it completed immediately.

Anyway you might want to start less threads and/or reoort a bug in mono project.