1
votes

I want to know if there's a way for a ZeroMQ socket to do only reading or only writing. Because, it seems to me that, even though there are async/multithreading examples, every thread still uses recv-then-send loop. The problem I have is, I want to have receiveMessage() that reads from ZeroMQ socket and sendMessage(msg) that writes to ZeroMQ socket. But each of those methods would run in separate thread that is constructed IN ANOTHER class. Here's my code (I'm using jeromq from Scala):

trait ZmqProtocol extends Protocol {

  val context: ZContext = new ZContext(1)
  private val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
  private val backendSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)

  frontendSocket.bind("tcp://*:5555")
  backendSocket.bind("inproc://backend")


  new Thread(() => {

    println("Started receiving messages")
    // Connect backend to frontend via a proxy
    ZMQ.proxy(frontendSocket, backendSocket, null)

  }).start()


  override def receiveMessage(): (String, String) = {

    val inprocReadSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
    inprocReadSocket.connect("inproc://backend")

    //  The DEALER socket gives us the address envelope and message
    val msg = ZMsg.recvMsg(inprocReadSocket)

    // Message from client's REQ socket contains 3 frames: address + empty frame + request content
    // (payload)
    val address = msg.pop
    val emptyFrame = msg.pop
    val request = msg.pop

    assert(request != null)
    msg.destroy()

    println(s"RECEIVED: $request FROM: $address")

    (address.toString, request.toString)
  }

  override def sendMessage(address: String, response: String): Unit = {

    val inprocWriteSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
    inprocWriteSocket.connect("inproc://backend")

    val addressFrame = new ZFrame(address)
    val emptyFrame = new ZFrame("")
    val responseFrame = new ZFrame(response)

    addressFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
    // Sending empty frame because client expects such constructed message
    emptyFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
    responseFrame.send(inprocWriteSocket, ZFrame.REUSE)

    addressFrame.destroy()
    emptyFrame.destroy()
    responseFrame.destroy()
  }

}

And here's how I would use it:

class TrafficHandler(val requestQueue: LinkedBlockingQueue[(String, Message)],
                     val responseQueue: LinkedBlockingQueue[(String, String)])
  extends Protocol {

def startHandlingTraffic(): Unit = {

    new Thread(() => {

      while (true) {

        val (address, message) = receiveMessage()

        requestQueue.put((address, message))
      }
    }).start()

    new Thread(() => {
      while (true) {

        val (address, response) = responseQueue.take()
        sendMessage(address, response)
      }
    }).start()
  }

During debugging, I've noticed I received message, correctly took it from response queue (concurrent blocking queue) with the correct destination address, but silently failed to send it. I've dived a bit in a jeromq code and it seems to me it has something to do with identity because outPipe is null. I'm guessing it's because I don't have correct recv-send loop.

EDIT AFTER @user3666197 response The code works! (although if you starting server first, it takes time to bind and connect to PUSH and PULL sockets)
Here is modified code that uses PUSH and PULL sockets:

trait ZmqProtocol extends Protocol {

  val context: ZContext = new ZContext(1)

  val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
  frontendSocket.bind("tcp://*:5555")

  val requestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
  requestQueueSocket.bind("inproc://requestQueueSocket")

  val responseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
  responseQueueSocket.bind("inproc://responseQueueSocket")

  val inprocRequestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
  inprocRequestQueueSocket.connect("inproc://requestQueueSocket")

  val inprocResponseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
  inprocResponseQueueSocket.connect("inproc://responseQueueSocket")

  new Thread(() => {

    println("Started receiving messages")

    while (true) {

      val msg = ZMsg.recvMsg(frontendSocket)

      // Message from client's REQ socket contains 3 frames: address + empty frame + request content
      // (payload)
      val reqAddress = msg.pop
      val emptyFrame = msg.pop
      val reqPayload = msg.pop

      assert(reqPayload != null)
      msg.destroy()

      println(s"RECEIVED: $reqPayload FROM: $reqAddress")

      requestQueueSocket.send(s"$reqAddress;$reqPayload")

      val responseMessage = new String(responseQueueSocket.recv(0))
      val respMessageSplit = responseMessage.split(";")

      val respAddress = respMessageSplit(0)
      val respPayload = respMessageSplit(1)

      val array = new BigInteger(respAddress, 16).toByteArray
      val respAddressFrame = new ZFrame(array)
      val respEmptyFrame = new ZFrame("")
      val respPayloadFrame = new ZFrame(respPayload)

      respAddressFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
      // Sending empty frame because client expects such constructed message
      respEmptyFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
      respPayloadFrame.send(frontendSocket, ZFrame.REUSE)

      respAddressFrame.destroy()
      respEmptyFrame.destroy()
      respPayloadFrame.destroy()

    }

  }).start()


  override def receiveMessage(): (String, String) = {

    val message = new String(inprocRequestQueueSocket.recv(0))
    val messageSplit = message.split(";")

    val address = messageSplit(0)
    val payload = messageSplit(1)

    (address, payload)
  }

  override def sendMessage(address: String, response: String): Unit = {

    inprocResponseQueueSocket.send(s"$address;$response")
  }
}

Here is the client if needed:

trait ZmqClientProtocol extends ClientProtocol {

  val context: ZMQ.Context = ZMQ.context(1)
  val socket: ZMQ.Socket = context.socket(ZMQ.REQ)

  println("Connecting to server")
  socket.connect("tcp://localhost:5555")

  override protected def send(message: String): String = {

    //  Ensure that the last byte of message is 0 because server is expecting a 0-terminated string
    val request = message.getBytes()

    // Send the message
    println(s"Sending request $request")
    socket.send(request, 0)

    //  Get the reply.
    val reply = socket.recv(0)

    new String(s"$message=${new String(reply)}")
  }
}
1

1 Answers

1
votes

Is there a way for a ZeroMQ socket to do only reading or only writing?

Yes, several ways.

a ) use a tandem of simplex archetypes: PUSH/PULL writes and PULL/PUSH reads
b ) use a tandem of simplex archetypes: (X)PUB/(X)SUB writes and (X)SUB/(X)PUB reads


... still uses .recv()-then-.send() loop.

Well, this observation is related more to the actual socket-archetype, some of which indeed require a mandatory two-step ( hardwired inside their internal FSA-s ) sequencing of .recv()--.send()--...


... but each of those methods would run in separate thread

Well, here the challenge starts: ZeroMQ was since its initiation designed as principally zero-sharing so as to foster performance and independence. Zen-of-Zero is interesting design principle in design.

Yet, recent re-design efforts have presented in API 4.2+ a will to achieve ZeroMQ socket Access-points to become thread-safe ( which goes against the initial principle of share-nothing ), so if going to experiment in this direction, your may arrive in territories, that work, but at a cost of decline from Zen-of-Zero.

ZeroMQ Socket Access-point(s) should never be shared, even if possible, because of design purity.

Better equip such class with another pair of simplex PUSH/PULL-s, if you strive for separation of OOP-concerns, but your head-end(s) of such read-only-specialised + write-only-specialised sockets will have to handle the cases, when a "remote" ( beyond the foreign class-boundary of abstraction ) ZeroMQ Socket-archetype FSA and it's settings and performance tweaking and error-state(s) and the "remote" class will have to arrange all such plus mediate all message-transfers to/from the native ZeroMQ-socket ( which is principally isolated and hidden for both of the head-end ( specialised ) classes ).

In any case, doable with due design care.


ZeroMQ resources are not any cheaply composable / disposable trash

An idea of:

...
override def sendMessage( address:  String,
                          response: String
                          ): Unit = {

             val inprocWriteSocket: ZMQ.Socket  = context.createSocket( ZMQ.DEALER )
                 inprocWriteSocket.connect( "inproc://backend" )
                 ...

may seem easy in the source code, but ignores the actual setup overheads and shall also respect the fact, that no socket ( inproc://-transport-class being a special case ) gets RTO ( Ready-To-Operate ) in the very microsecond it was instantiated inside the Context(), the less being a fully .connect()-ed and RTO-ed after all handshaking with the remote counterparty, so best setup the SIG/MSG-infrastructure well beforehand and keep it best as a semi-persistent communication layer, rather than any ad-hoc / just-in-time initiated composable/disposable... ( Ecology of Resources )


inproc://-transport-class has one more requirement pre-API 4.x:

Connecting a socket

When connecting a socket to a peer address using zmq_connect() with the inproc:// transport, the endpoint shall be interpreted as an arbitrary string identifying the name to connect to. Before version 4.0 the name must have been previously created by assigning it to at least one socket within the same ØMQ context as the socket being connected. Since version 4.0 the order of zmq_bind() and zmq_connect() does not matter just like for the tcp:// transport type.

So in cases, when your deployment is unsure about the actual localhost API version, beware enforcing the proper order of .bind() / .connect(), otherwise teh inproc:// pipes will not work for you.