1
votes

I'm trying to implement a "file dispatcher" on zmq (actually jeromq, I'd rather avoid jni).

What I need is to load balance incoming files to processors:

  • each file is handled only by one processor
  • files are potentially large so I need to manage the file transfer

Ideally I would like something like https://github.com/zeromq/filemq but

  • with a push/pull behaviour rather than publish/subscribe
  • being able to handle the received file rather than writing it to disk

My idea is to use a mix of taskvent/tasksink and asyncsrv samples.

Client side:

  • one PULL socket to be notified of a file to be processed
  • one DEALER socket to handle the (async) file transfer chunk by chunk

Server side:

  • one PUSH socket to dispatch incoming file (names)
  • one ROUTER socket to handle file requests
  • a few DEALER workers managing the file transfers for clients and connected to the router via an inproc proxy

My first question is: does this seem like the right way to go? Anything simpler maybe?

My second question is: my current implem gets stuck on sending out the actual file data.

  • clients are notified by the server, and issue a request.
  • the server worker gets the request, and writes the response back to the inproc queue but the response never seems to go out of the server (can't see it in wireshark) and the client is stuck on the poller.poll awaiting the response.

It's not a matter of sockets being full and dropping data, I'm starting with very small files sent in one go.

Any insight?

Thanks!

==================

Following raffian's advice I simplified my code, removing the push/pull extra socket (it does make sense now that you say it)

I'm left with the "non working" socket!

Here's my current code. It has many flaws that are out of scope for now (client ID, next chunk etc..)

For now, I'm just trying to have both guys talking to each other roughly in that sequence

  • Server

    object FileDispatcher extends App
    {
      val context = ZMQ.context(1)
    
      // server is the frontend that pushes filenames to clients and receives requests
      val server = context.socket(ZMQ.ROUTER)
      server.bind("tcp://*:5565")
      // backend handles clients requests
      val backend = context.socket(ZMQ.DEALER)
      backend.bind("inproc://backend")
    
      // files to dispatch given in arguments
      args.toList.foreach { filepath =>
        println(s"publish $filepath")
        server.send("newfile".getBytes(), ZMQ.SNDMORE)
        server.send(filepath.getBytes(), 0)
      }
    
      // multithreaded server: router hands out requests to DEALER workers via a inproc queue
      val NB_WORKERS = 1
      val workers = List.fill(NB_WORKERS)(new Thread(new ServerWorker(context)))
      workers foreach (_.start)
      ZMQ.proxy(server, backend, null)
    }
    
    class ServerWorker(ctx: ZMQ.Context) extends Runnable 
    {
      override def run() 
      {
        val worker = ctx.socket(ZMQ.DEALER)
        worker.connect("inproc://backend")
    
        while (true) 
        {
          val zmsg = ZMsg.recvMsg(worker) 
          zmsg.pop // drop inner queue envelope (?)
          val cmd = zmsg.pop //cmd is used to continue/stop
          cmd.toString match {
            case "get" => 
              val file = zmsg.pop.toString
              println(s"clientReq: cmd: $cmd , file:$file")
              //1- brute force: ignore cmd and send full file in one go!
              worker.send("eof".getBytes, ZMQ.SNDMORE) //header indicates this is the last chunk
              val bytes = io.Source.fromFile(file).mkString("").getBytes //dirty read, for testing only!
              worker.send(bytes, 0)
              println(s"${bytes.size} bytes sent for $file: "+new String(bytes))
            case x => println("cmd "+x+" not implemented!")
          }
        }
      }
    }
    
  • client

    object FileHandler extends App
    {
      val context = ZMQ.context(1)
    
      // client is notified of new files then fetches file from server
      val client = context.socket(ZMQ.DEALER)
      client.connect("tcp://*:5565")
      val poller = new ZMQ.Poller(1) //"poll" responses
      poller.register(client, ZMQ.Poller.POLLIN)
    
      while (true) 
      {
        poller.poll
        val zmsg = ZMsg.recvMsg(client)
        val cmd = zmsg.pop
        val data = zmsg.pop
        // header is the command/action
        cmd.toString match {
          case "newfile" => startDownload(data.toString)// message content is the filename to fetch
          case "chunk" => gotChunk(data.toString, zmsg.pop.getData) //filename, chunk
          case "eof" => endDownload(data.toString, zmsg.pop.getData) //filename, last chunk
        }
      }
    
      def startDownload(filename: String) 
      {
        println("got notification: start download for "+filename)
        client.send("get".getBytes, ZMQ.SNDMORE) //command header
        client.send(filename.getBytes, 0) 
      }
    
      def gotChunk(filename: String, bytes: Array[Byte]) 
      {
        println("got chunk for "+filename+": "+new String(bytes)) //callback the user here
        client.send("next".getBytes, ZMQ.SNDMORE)
        client.send(filename.getBytes, 0) 
      }
    
      def endDownload(filename: String, bytes: Array[Byte]) 
      {
        println("got eof for "+filename+": "+new String(bytes)) //callback the user here
      }
    }
    
1

1 Answers

3
votes

On the client, you don't need PULL with DEALER. DEALER is PUSH and PULL combined, so use DEALER only, your code will be simpler.

Same goes for the server, unless you're doing something special, you don't need PUSH with ROUTER, router is bidirectional.

the server worker gets the request, and writes the response back to the inproc queue but the response never seems to go out of the server (can't see it in wireshark) and the client is stuck on the poller.poll awaiting the response.

Code Problems

In the server, you're dispatching files with args.toList.foreach before starting the proxy, this is probably why nothing is leaving the server. Start the proxy first, then use it; Also, once you call ZMQProxy(..), the code blocks indefinitely, so you'll need a separate thread to send the filepaths.

The client may have an issue with the poller. The typical pattern for polling is:

ZMQ.Poller items = new ZMQ.Poller (1);
items.register(receiver, ZMQ.Poller.POLLIN);
while (true) {

  items.poll(TIMEOUT);
  if (items.pollin(0)) {
    message = receiver.recv(0);

In the above code, 1) poll until timeout, 2) then check for messages, and if available, 3) get with receiver.recv(0). But in your code, you poll then drop into recv() without checking. You need to check if the poller has messages for that polled socket before calling recv(), otherwise, the receiver will hang if there's no messages.