4
votes

I'm currently working on a reasonably large data process task and need to split up and distribute the processing. I have a fully working pipeline prototype using ZeroMQ's PUSH / PULL mechanism - the "processing" data is being sent and completes OK.

However, once the data has been fanned out, and processing is complete, I need to instruct the workers (running in a while/true loop) to terminate. I thought it wise to use ZMQ's PUB/SUB mechanism for this. However none of the control messages are being received by the workers.

Frustratingly, if I switch over to a PUSH/PULL mechanism the workers receive the message and terminate correctly.

I've copied some code from the prototype here:

http://pastebin.com/myZyWQ1E

You can see the producer (the bit that sends data to the worker) creates and binds to a control socket (line 19), which the worker also binds too (line 53). Once the work is complete from the producer it sends a terminate command through the control socket, the worker continues to loop and check for control messages. Unfortunately line 75 never tests true, and the process continues to loop.

// Control socket create / bind from producer
$this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB);
$this->controlSocket->bind('tcp://172.0.0.1:51001');

// Control socket create / bind from worker
$control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
$control->connect('tcp://172.0.0.1:51001');
1

1 Answers

6
votes

Yeah, you're probably hitting an instance of the "slow joiner" issue. PUB doesn't block, so it will drop messages if it doesn't have anyone to send them to. Because you create the socket and immediately send, the subscribers will still be connecting - this takes a non-zero amount of time, so the message gets lost. Try creating the control socket as soon as you start, should give the connections chance to stabilise (alternatively, put a bit of a sleep between the bind() and the send() in the control socket). See the ZGuide for a more detailed discussion: http://zguide.zeromq.org/page:all (search "slow joiner").

One other thing I noticed is that you're checking the control socket outside the poll. The aim of poll is to let you do exactly that kind of thing - add it as another POLL_IN to the pollset you've already created and check whether the returned socket in read() is === (note, triple equals!) to the control socket or the data socket. You'll get better response times that way.