I'm currently working on a reasonably large data process task and need to split up and distribute the processing. I have a fully working pipeline prototype using ZeroMQ's PUSH / PULL mechanism - the "processing" data is being sent and completes OK.
However, once the data has been fanned out, and processing is complete, I need to instruct the workers (running in a while/true loop) to terminate. I thought it wise to use ZMQ's PUB/SUB mechanism for this. However none of the control messages are being received by the workers.
Frustratingly, if I switch over to a PUSH/PULL mechanism the workers receive the message and terminate correctly.
I've copied some code from the prototype here:
You can see the producer (the bit that sends data to the worker) creates and binds to a control socket (line 19), which the worker also binds too (line 53). Once the work is complete from the producer it sends a terminate command through the control socket, the worker continues to loop and check for control messages. Unfortunately line 75 never tests true, and the process continues to loop.
// Control socket create / bind from producer
$this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB);
$this->controlSocket->bind('tcp://172.0.0.1:51001');
// Control socket create / bind from worker
$control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
$control->connect('tcp://172.0.0.1:51001');