0
votes

The server loops through a list of objects, the data on those objects changes in real time. Every millisecond the server publishes all of the new data of those objects. i.e. ['Carrot', 'Banana', 'Mango', 'Eggplant']

The Client can subscribe to specific objects via their name. self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, 'Carrot')
On a thread the client polls these data in realtime as well:

while True:
    sockets = dict(self.poller.poll(poll_timeout))
    if self.sub_socket in sockets and sockets[self.sub_socket] == zmq.POLLIN:
        msg = self.sub_socket.recv_string(zmq.DONTWAIT)

        // do something with the msg...

The problem is when I subscribe to multiple objects let's say, Carrot, Eggplant & Banana. I only receive the changes from Carrot, sometimes Banana, and so rare on Eggplant. I think this is because from the order of looping of the server, like maybe when the client polls, receives Carrot, process the data, then polls again but the server is already done with the publishing through the list and just publishing Carrot again then client polls receives just Carrot because of that.

So I thought of creating individual sockets for each subscription? Is that a solution? I'm pretty new with ZMQ.

1

1 Answers

1
votes

Q : "Is that a solution?" ... creating individual sockets for each subscription?

No. Unless motivated by some, not known to me, other reasons.


While ZeroMQ message passing infrastructure provides Zero Warranty of each message delivery, that does not mean that messages are evaporating or lost anywhere after being sent. It just says, expect Zero Warranty for each one being delivered and if one needs, one can add such wanted warranty-mechanism overheads, that others need not pay if they can work without them. Losing 1-in-1.000.000? 1-in-1.000.000.000? That depends on many factors, yet loosing a message is not a common or random state of a distributed computing system ( and has some internal reasons, details of which go beyond the scope of this post ).


Still In Doubts ?

Make a test.

Design a simple test - PUB-side sending a uniformly distributed trivial messages

SAMPLEs  = int( 1E6 )
aMsgSIZE = 2048
TOPICs   = [ r'Carrot', r'Banana', r'Mango', r'Eggplant', r'' ]
MASK     = "{0:}" + aMsgSIZE * "_"

for i in range( SAMPLEs ):
    PUB.send( MASK.format( TOPICs[np.random.randint( len( TOPICs ) - 1 ) ) )
    time.sleep( 1E-3 )

Using this test, you shall receive the uniformly distributed sample with a same amount of each of the subscribed TOPICs ( if all were subscribed ).

Growing the aMsgSIZE may ( under default Context()- and Socket()-instances ) create some messaged to get "lost", but again, this ought be uniformly distributed. If not, there would be some trouble to dig deeper.

The amount of messages uniformly not delivered from the SAMPLEs amount will demonstrate how big is the need to tweak the Context() and Socket()-instances' parameters so as to provide resources enough to safely enqueue that amount of data-flow. Yet having more Socket()-s for individual subscribed Topic-strings will not solve this resources management bottleneck, if present.

Do not hesitate to post the test-results, if the uniformly distributed mix of topics was or was not skewed and how big fraction was not received at the end.

Add platform details + ZeroMQ version, all that matters, as always :o)