3
votes

We have a Java application which is acting as a server. Client applications (written in C#) are communicating with it using ZeroMQ. We are (mostly) following the Lazy Pirate pattern.

The server has a Router socket, implemented as follows (using JeroMQ):

ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.bind("tcp://*:5555");

The clients connect and send messages like this:

ZContext context = ZContext.Create();
ZSocket socket = ZSocket.Create(context, ZSocketType.REQ);
socket.Identity = Encoding.UTF8.GetBytes("Some identity");
socket.Connect("tcp://my_host:5555");
socket.Send(new ZFrame("request data"));

We have experienced lost messages when multiple clients are sending messages at the same time. With a single client, there doesn't appear to be any problem.

Are we implementing this the right way for a multiple-client-single-server setup?

Update: Example client and server exhibiting this behaviour:

Server:

import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;

public class SimpleServer
{
    public static void main(String[] args) throws InterruptedException
    {
        ZContext context = new ZContext();
        Socket socket = context.createSocket(ZMQ.ROUTER);
        socket.setRouterMandatory(true);
        socket.bind("tcp://*:5559");

        PollItem pollItem = new PollItem(socket, Poller.POLLIN);

        int messagesReceived = 0;
        int pollCount = 0;

        while ((pollCount = ZMQ.poll(new PollItem[]{pollItem}, 3000)) > -1)
        {
            messagesReceived += pollCount;

            for (int i = 0 ; i < pollCount ; i++)
            {
                ZMsg msg = ZMsg.recvMsg(socket);
                System.out.println(String.format("Received message: %s. Total messages received: %d", msg, messagesReceived));
            }

            if (pollCount == 0)
            {
                System.out.println(String.format("No messages on socket. Total messages received: %d", messagesReceived));
            }
        }
    }
}

Client:

using NetMQ;
using System;
using System.Text;

namespace SimpleClient
{
    class Program
    {
        static byte[] identity = Encoding.UTF8.GetBytes("id" + DateTime.UtcNow.Ticks);

        static void Main(string[] args)
        {
            for (int i = 0; i < 100; i++)
            {
                SendMessage();
            }
        }

        private static void SendMessage()
        {
            using (NetMQContext context = NetMQContext.Create())
            {
                using (NetMQSocket socket = context.CreateRequestSocket())
                {
                    socket.Options.Identity = identity;
                    socket.Connect("tcp://localhost:5559");
                    socket.Send(Encoding.UTF8.GetBytes("hello!"));
                }
            }
        }
    }
}

If I run the server and a single client, I can see all my 100 messages arrive. If I run, say, 5 clients simultaneously, I only get around 200 -> 300 messages arrive, instead of the full 500. As an aside, it appears that closing the socket in the client is somehow stopping the router socket on the server from receiving messages briefly, although this is just a theory.

3
Could you provide more code? How does your server handle requests? Have you tried implementing a simple 'hello world' server with your architecture? Where does your requester listen for its response? If you're using REQ, you have to do a synchronous receive for the response.antiduh
@antiduh - Thanks, I've updated the question with a simple client/server that is behaving like this. I haven't actually got on to processing the responses yet, it's just a case of verifying all the messages are being delivered to the server.Jamie Poole
Have you tried spinning up a single context/socket that's used throughout the lifetime of the client, rather than spinning them up and breaking them down for each message? That'd be my first debugging suggestion. The default ZMQ_LINGER setting should prevent any issues from doing what you're doing, but who knowsJason
@Jason - I have considered it, although it's not really an option for me at the moment. I need to try to get to the bottom of the issue with this connection-per-request model.Jamie Poole
What I'm asking is, if you make that change in your example code does it start behaving, or is it still experiencing the same issue? If it does start behaving, that points you to there being a definite issue with having that much turbulence in your ZMQ context.Jason

3 Answers

2
votes

Part 1 - poll may return more than one event

ZMQ.poll() returns the number of events that were found:

int rc = ZMQ.poll(new PollItem[]{pollItem}, 3000);

You currently assume that one return from poll is one event. Instead, you should loop over ZMsg msg = ZMsg.recvMsg(socket); for the number of events that are indicated by the return of ZMQ.Poll().

From the source of JeroMQ:

/**
 * Polling on items. This has very poor performance.
 * Try to use zmq_poll with selector
 * CAUTION: This could be affected by jdk epoll bug
 *
 * @param items
 * @param timeout
 * @return number of events
 */
public static int zmq_poll(PollItem[] items, long timeout)
{
    return zmq_poll(items, items.length, timeout);
}

Part 2 - ZMsg.receive() may return multiple frames

When you receive a ZMsg from ZMsg msg = ZMsg.recvMsg(socket);, the ZMsg may contain multiple ZFrames, each containing client data.

From the comments of the ZMsg class in JeroMQ's source:

 * // Receive message from ZMQSocket "input" socket object and iterate over frames
 * ZMsg receivedMessage = ZMsg.recvMsg(input);
 * for (ZFrame f : receivedMessage) {
 *     // Do something with frame f (of type ZFrame)
 * }

Part 3 - messages can be split across multiple ZFrames

From ZFrame's source in JeroMQ:

 * The ZFrame class provides methods to send and receive single message
 * frames across 0MQ sockets. A 'frame' corresponds to one underlying zmq_msg_t in the libzmq code.
 * When you read a frame from a socket, the more() method indicates if the frame is part of an
 * unfinished multipart message.

If I'm understanding this correctly, then for each event you may get multiple frames, and one client message may map to 1..N frames (if the message is big?).

So to summarize:

  • One return from poll may indicate multiple events.
  • One event and thus one ZMsg.receive() may contain multiple frames
  • One frame could contain one complete client message or only part of a client message; one client message maps to 1..N frames.
2
votes

Unfortunately we couldn't solve this particular issue, and have moved away from using ZeroMQ for this interface. In case it helps anyone else, the only things we worked out for definite is that rapidly opening/closing the request sockets caused undesirable behaviour (lost messages) on the router socket end. The problem was exacerbated by a poorly performing server CPU, and didn't appear at all when the server was on a fast multi-core machine.

1
votes

Unfortunatley I was not even close working with ZMQ at the time this question was active. But I had the same problem today and found this page. And your answer (not using ZMQ) was not satisfying for me. So I searched a bit more and finally found out what to do.

Just as a reminder: this works with the "POLLER" in ZMQ [1]

If you use "PAIR" connection you will for sure do NOT lose nay files, BUT send/recive takes approx. the same time. So you can not speed up and was not a solution for me.

Solution:

  • in zmq_setsockopt (python: zmq.setsockopt) you can set ZMQ_HWM (zmq.SNDHWM, zmq.RCVHWM) to '0' [2]

    • in python: sock.setsockopt(zmq.SNDHWM , 0) resp. sock.setsockopt(zmq.RCVHWM, 0) for the Sender resp. Reciver

    • note: i think notation changed from HWM to SNDWHM/RCVHWM

    • HWM = 0 means that there is "NO limit" for the number of messages (so be careful, maybe set a (hvery high) limit)

  • there is also ZMQ_SNDBUF/ ZMQ_RCVBUF (python: zmq.SNDBUF/zmq.RCVBUF) which you can give as well, ie. sock.setsockopt(zmq.RCVBUF, 0) resp. ..... [2]

    • so this will set the operating system "SO_RCVBUF" to default (here my knowledge ends)

    • setting this parameter or not did NOT influence my case but I think it might

Performance:

So with this I could "send" 100'000 files with 98kB in ~8s (~10GB): this will fill your RAM (if this is full I think your program will slow down), see also picture

in the mean time I "recived" and saved the files in about ~enter image description here118s and freeing the RAM again

Also, with that I NERVER lost a file up to now. (you might if you hit the limits of your PC)

data loss is "GOOD":

  • if you realy NEED all the data you should use this method

  • if you can regard some losses are fine (e.g. live plotting: as long as your FPS > ~50 you will smoothly see the plots and you do not care if you lose something)

  • --> you can save RAM and avoid blocking your whole PC!

Hope this post helps for the next person coming by...

[1]: https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/multisocket/zmqpoller.htm

[2]: http://api.zeromq.org/2-1:zmq-setsockopt

You find a Picture of the RAM her: RAM is loading in about 8s. Afterwords the disk is saving the files from the buffer