I want to know if there's a way for a ZeroMQ socket to do only reading or only writing. Because, it seems to me that, even though there are async/multithreading examples, every thread still uses recv-then-send loop. The problem I have is, I want to have receiveMessage()
that reads from ZeroMQ socket and sendMessage(msg)
that writes to ZeroMQ socket. But each of those methods would run in separate thread that is constructed IN ANOTHER class. Here's my code (I'm using jeromq from Scala):
trait ZmqProtocol extends Protocol {
val context: ZContext = new ZContext(1)
private val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
private val backendSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
frontendSocket.bind("tcp://*:5555")
backendSocket.bind("inproc://backend")
new Thread(() => {
println("Started receiving messages")
// Connect backend to frontend via a proxy
ZMQ.proxy(frontendSocket, backendSocket, null)
}).start()
override def receiveMessage(): (String, String) = {
val inprocReadSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
inprocReadSocket.connect("inproc://backend")
// The DEALER socket gives us the address envelope and message
val msg = ZMsg.recvMsg(inprocReadSocket)
// Message from client's REQ socket contains 3 frames: address + empty frame + request content
// (payload)
val address = msg.pop
val emptyFrame = msg.pop
val request = msg.pop
assert(request != null)
msg.destroy()
println(s"RECEIVED: $request FROM: $address")
(address.toString, request.toString)
}
override def sendMessage(address: String, response: String): Unit = {
val inprocWriteSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
inprocWriteSocket.connect("inproc://backend")
val addressFrame = new ZFrame(address)
val emptyFrame = new ZFrame("")
val responseFrame = new ZFrame(response)
addressFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
// Sending empty frame because client expects such constructed message
emptyFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
responseFrame.send(inprocWriteSocket, ZFrame.REUSE)
addressFrame.destroy()
emptyFrame.destroy()
responseFrame.destroy()
}
}
And here's how I would use it:
class TrafficHandler(val requestQueue: LinkedBlockingQueue[(String, Message)],
val responseQueue: LinkedBlockingQueue[(String, String)])
extends Protocol {
def startHandlingTraffic(): Unit = {
new Thread(() => {
while (true) {
val (address, message) = receiveMessage()
requestQueue.put((address, message))
}
}).start()
new Thread(() => {
while (true) {
val (address, response) = responseQueue.take()
sendMessage(address, response)
}
}).start()
}
During debugging, I've noticed I received message, correctly took it from response queue (concurrent blocking queue) with the correct destination address, but silently failed to send it. I've dived a bit in a jeromq code and it seems to me it has something to do with identity because outPipe is null. I'm guessing it's because I don't have correct recv-send loop.
EDIT AFTER @user3666197 response The code works! (although if you starting server first, it takes time to bind and connect to PUSH
and PULL
sockets)
Here is modified code that uses PUSH
and PULL
sockets:
trait ZmqProtocol extends Protocol {
val context: ZContext = new ZContext(1)
val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
frontendSocket.bind("tcp://*:5555")
val requestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
requestQueueSocket.bind("inproc://requestQueueSocket")
val responseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
responseQueueSocket.bind("inproc://responseQueueSocket")
val inprocRequestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
inprocRequestQueueSocket.connect("inproc://requestQueueSocket")
val inprocResponseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
inprocResponseQueueSocket.connect("inproc://responseQueueSocket")
new Thread(() => {
println("Started receiving messages")
while (true) {
val msg = ZMsg.recvMsg(frontendSocket)
// Message from client's REQ socket contains 3 frames: address + empty frame + request content
// (payload)
val reqAddress = msg.pop
val emptyFrame = msg.pop
val reqPayload = msg.pop
assert(reqPayload != null)
msg.destroy()
println(s"RECEIVED: $reqPayload FROM: $reqAddress")
requestQueueSocket.send(s"$reqAddress;$reqPayload")
val responseMessage = new String(responseQueueSocket.recv(0))
val respMessageSplit = responseMessage.split(";")
val respAddress = respMessageSplit(0)
val respPayload = respMessageSplit(1)
val array = new BigInteger(respAddress, 16).toByteArray
val respAddressFrame = new ZFrame(array)
val respEmptyFrame = new ZFrame("")
val respPayloadFrame = new ZFrame(respPayload)
respAddressFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
// Sending empty frame because client expects such constructed message
respEmptyFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
respPayloadFrame.send(frontendSocket, ZFrame.REUSE)
respAddressFrame.destroy()
respEmptyFrame.destroy()
respPayloadFrame.destroy()
}
}).start()
override def receiveMessage(): (String, String) = {
val message = new String(inprocRequestQueueSocket.recv(0))
val messageSplit = message.split(";")
val address = messageSplit(0)
val payload = messageSplit(1)
(address, payload)
}
override def sendMessage(address: String, response: String): Unit = {
inprocResponseQueueSocket.send(s"$address;$response")
}
}
Here is the client if needed:
trait ZmqClientProtocol extends ClientProtocol {
val context: ZMQ.Context = ZMQ.context(1)
val socket: ZMQ.Socket = context.socket(ZMQ.REQ)
println("Connecting to server")
socket.connect("tcp://localhost:5555")
override protected def send(message: String): String = {
// Ensure that the last byte of message is 0 because server is expecting a 0-terminated string
val request = message.getBytes()
// Send the message
println(s"Sending request $request")
socket.send(request, 0)
// Get the reply.
val reply = socket.recv(0)
new String(s"$message=${new String(reply)}")
}
}