I'm trying to implement a "file dispatcher" on zmq (actually jeromq, I'd rather avoid jni).
What I need is to load balance incoming files to processors:
- each file is handled only by one processor
- files are potentially large so I need to manage the file transfer
Ideally I would like something like https://github.com/zeromq/filemq but
- with a push/pull behaviour rather than publish/subscribe
- being able to handle the received file rather than writing it to disk
My idea is to use a mix of taskvent/tasksink and asyncsrv samples.
Client side:
- one PULL socket to be notified of a file to be processed
- one DEALER socket to handle the (async) file transfer chunk by chunk
Server side:
- one PUSH socket to dispatch incoming file (names)
- one ROUTER socket to handle file requests
- a few DEALER workers managing the file transfers for clients and connected to the router via an inproc proxy
My first question is: does this seem like the right way to go? Anything simpler maybe?
My second question is: my current implem gets stuck on sending out the actual file data.
- clients are notified by the server, and issue a request.
- the server worker gets the request, and writes the response back to the inproc queue but the response never seems to go out of the server (can't see it in wireshark) and the client is stuck on the poller.poll awaiting the response.
It's not a matter of sockets being full and dropping data, I'm starting with very small files sent in one go.
Any insight?
Thanks!
==================
Following raffian's advice I simplified my code, removing the push/pull extra socket (it does make sense now that you say it)
I'm left with the "non working" socket!
Here's my current code. It has many flaws that are out of scope for now (client ID, next chunk etc..)
For now, I'm just trying to have both guys talking to each other roughly in that sequence
Server
object FileDispatcher extends App { val context = ZMQ.context(1) // server is the frontend that pushes filenames to clients and receives requests val server = context.socket(ZMQ.ROUTER) server.bind("tcp://*:5565") // backend handles clients requests val backend = context.socket(ZMQ.DEALER) backend.bind("inproc://backend") // files to dispatch given in arguments args.toList.foreach { filepath => println(s"publish $filepath") server.send("newfile".getBytes(), ZMQ.SNDMORE) server.send(filepath.getBytes(), 0) } // multithreaded server: router hands out requests to DEALER workers via a inproc queue val NB_WORKERS = 1 val workers = List.fill(NB_WORKERS)(new Thread(new ServerWorker(context))) workers foreach (_.start) ZMQ.proxy(server, backend, null) } class ServerWorker(ctx: ZMQ.Context) extends Runnable { override def run() { val worker = ctx.socket(ZMQ.DEALER) worker.connect("inproc://backend") while (true) { val zmsg = ZMsg.recvMsg(worker) zmsg.pop // drop inner queue envelope (?) val cmd = zmsg.pop //cmd is used to continue/stop cmd.toString match { case "get" => val file = zmsg.pop.toString println(s"clientReq: cmd: $cmd , file:$file") //1- brute force: ignore cmd and send full file in one go! worker.send("eof".getBytes, ZMQ.SNDMORE) //header indicates this is the last chunk val bytes = io.Source.fromFile(file).mkString("").getBytes //dirty read, for testing only! worker.send(bytes, 0) println(s"${bytes.size} bytes sent for $file: "+new String(bytes)) case x => println("cmd "+x+" not implemented!") } } } }
client
object FileHandler extends App { val context = ZMQ.context(1) // client is notified of new files then fetches file from server val client = context.socket(ZMQ.DEALER) client.connect("tcp://*:5565") val poller = new ZMQ.Poller(1) //"poll" responses poller.register(client, ZMQ.Poller.POLLIN) while (true) { poller.poll val zmsg = ZMsg.recvMsg(client) val cmd = zmsg.pop val data = zmsg.pop // header is the command/action cmd.toString match { case "newfile" => startDownload(data.toString)// message content is the filename to fetch case "chunk" => gotChunk(data.toString, zmsg.pop.getData) //filename, chunk case "eof" => endDownload(data.toString, zmsg.pop.getData) //filename, last chunk } } def startDownload(filename: String) { println("got notification: start download for "+filename) client.send("get".getBytes, ZMQ.SNDMORE) //command header client.send(filename.getBytes, 0) } def gotChunk(filename: String, bytes: Array[Byte]) { println("got chunk for "+filename+": "+new String(bytes)) //callback the user here client.send("next".getBytes, ZMQ.SNDMORE) client.send(filename.getBytes, 0) } def endDownload(filename: String, bytes: Array[Byte]) { println("got eof for "+filename+": "+new String(bytes)) //callback the user here } }