0
votes

I'm using inproc zmq sockets for multi-threaded data sharing. I have multiple thread clients to one master thread; each client has a PUSH socket and the master has a PULL socket acting as a sink for all clients. For the most part each client is independent, but I do have some modest order requirements since one client thread is rather special.

Here is some code that illustrates a variation on my problem:

import threading
import zmq

context = zmq.Context()

pull = context.socket(zmq.PULL)
pull.bind('inproc://my-socket')

def slave():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket')

    for x in 'one two three'.split(' '):
        push.send('>>> '+x)
    #push.send('END')
    push.close()

def master():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket')

    for x in 'one two three'.split(' '):
        push.send(x)

    x = threading.Thread(target=slave)
    x.start()

    while x.is_alive():
        pass

    push.send('END')
    push.close()

thread = threading.Thread(target=master)
thread.start()

while True:
    if pull.poll():
        x = pull.recv()
        if x == 'END':
            print 'END - exiting'
            break
        print x

The fact that the slave thread is started entirely after the master thread has sent it's main payload led me to expect all of master's data before slave's data. However that is not consistently the case. Consider the following output (in reality, the order is not consistent, but I really did get this order):

$ python zmq_threads.py 
one
two
>>> one
three
>>> two
END - exiting

I wanted the following order reliably and I believe this order is forced by the master/slave arrangement as far as send goes

$ python zmq_threads.py 
one
two
three
>>> one
>>> two
>>> three
END - exiting

Thinking on it, I can see that multiple socket clients would not promise this sort of synchronization. However, it feels like I should be able to flush something somehow to force the order of recv (especially with the inproc transport). Any ideas?

1

1 Answers

0
votes

I've resolved this by have the slave thread use a different socket which is received in the master thread and sent on to the main program. In theory, I think a STREAMER device could be used here connecting inproc://my-socket-2 to inproc://my-socket, but I don't think I have any more guarantees of order I simply throw another thread in the mix to host the STREAMER.

Here is another rendition of my question code with the technique illustrated. In all cases I find the need for an 'END' beacon rather annoying, but I don't know if there is any better zmq/thread mechanism.

import threading
import zmq

context = zmq.Context()

pull = context.socket(zmq.PULL)
pull.bind('inproc://my-socket')

def slave():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket-2')

    for x in 'one two three'.split(' '):
        push.send('>>> '+x)
    push.send('END')
    push.close()

def master():
    global context
    push = context.socket(zmq.PUSH)
    push.connect('inproc://my-socket')

    for x in 'one two three'.split(' '):
        push.send(x)

    slaved_pull = context.socket(zmq.PULL)
    slaved_pull.bind('inproc://my-socket-2')
    x = threading.Thread(target=slave)
    x.start()

    while True:
        if slaved_pull.poll():
            x = slaved_pull.recv()
            if x == 'END':
                #print 'END - exiting'
                break
            push.send(x)

    push.send('END')
    push.close()

thread = threading.Thread(target=master)
thread.start()

while True:
    if pull.poll():
        x = pull.recv()
        if x == 'END':
            print 'END - exiting'
            break
        print x