3
votes

I'm losing messages only when I'm subscribing to topics.

Here's the scenario:
The subscriber subscribes to a specific topic and then calls the publisher in a different thread (with the same context and the same "subscribed topic").
The publisher receives the "subscribed topic" and publishes it.

When I run two procedures (meaning 2 subscriber threads and 2 publisher threads), I receive only one message on one of the threads (randomly).
I have no idea why I'm losing the second message.

Publisher thread:

void *publisher = zmq_socket(ptStruct->zContext, ZMQ_PUB);
assert(0 == zmq_bind(publisher, "inproc://#1"));
printf("Publishes to %d \n", ptStruct->iID);
assert(-1 != zmq_send(publisher, &(ptStruct->iID), sizeof(ptStruct->iID), 0));
zmq_close(publisher);

Subscriber thread:

void *subscriber = zmq_socket(ptStruct->zContext, ZMQ_SUB);
assert(0 == zmq_connect(subscriber, "inproc://#1"));
assert(0 == zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, &(ptStruct->iID), sizeof(ptStruct->iID)));
printf("Subscribed to %d \n", ptStruct->iID);

/* Now run the publisher in a different thread */
OSTHREAD_CreateThread(&ptThread, publishThread, ptStruct, NULL);

assert(-1 != zmq_recv(subscriber, acRec, 255, 0));
printf("Got %d \n", acRec[0]);
zmq_close(subscriber);

I run the subscriber thread twice and this is the output:

Subscribed to 1
Subscribed to 2
Publishes to 1
Got 1
Publishes to 2
1

1 Answers

1
votes

You're creating two different publishers that are bind()-ing to the same inproc endpoint "#1" - an endpoint can only be bound to once, the second publisher is failing to bind() on the same endpoint and then not sending the message.

Additionally, you'll probably want to add in some delay between the publisher bind()-ing and then send()-ing the first message, due to the slow joiner problem - the publisher could attempt to send, and then drop, your message before the publisher and subscriber finish connecting, which would also cause you to lose your message.