2
votes

I'm trying to get an Asynchronous client/server in C model going using the libzmq C API.

I'm using ZeroMQ 3.2.2 on a Linux platform and the pattern I am trying to get working looks as below:

>Client -> DEALER  
>
>Router -> ROUTER  
>---   proxy   ---  
>Dealer -> DEALER  
>
>Workers-> DEALER  

I need to clients to be non blocking, yet to receive response to messages. From the examples I have seen, my understanding is that using ROUTER-DEALER with zmq_proxy should return the message to the initial client.

However, by attaching a capture socket to the zmq_proxy it appears that the response is not routed back.

When I change the client to REQ and the worker to REP then all works as expected. Any feedback to where I am going wrong or misunderstanding would be welcome.

The 3 components (client, broker and worker) are below.

TestClient

#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>

int main (void)
{
    printf ("Connecting to hello world broker...\n");

    void *ctx = zmq_ctx_new ();
    void *requester = zmq_socket (ctx, ZMQ_DEALER);
     //void *requester = zmq_socket (ctx, ZMQ_REQ);
    zmq_connect (requester, "tcp://10.1.1.31:5555");

    printf ("Sending Request  : HELLO \n");
    int rc = zmq_send (requester, "HELLO", 6, 0);

    if (rc > 0) {
        printf ("Success : Sent size ... %d!\n",rc);
    } else {
        printf("Error: %s\n", zmq_strerror(errno)); 
    }

    printf ("Wait for response ..\n");

    char buffer [256];
    zmq_recv (requester, buffer, 256, 0);

    printf ("Response Received : %s\n",&buffer);

    zmq_close (requester);
    zmq_ctx_destroy (ctx);
    return 0;
}

TestBroker

#define _MULTI_THREADED
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <zmq.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>

#define ZFRAME_MORE     1
#define ZFRAME_REUSE    2
#define ZFRAME_DONTWAIT 4

static void *proxy_capture (void *ctx)
{
    int zerr = 0 ;
    int rRes;
    void *worker = zmq_socket (ctx, ZMQ_DEALER);
    zerr = zmq_connect (worker, "ipc://capture.ipc");

    if (zerr != 0)
    {
        printf ("\n-------------- > proxy_capture bind error : %s\n", zmq_strerror(errno));
        return 0;
    }

    while (1) {
        char buf [256];
        int rc = zmq_recv (worker, buf, 256, 0); 
        assert (rc != -1);
        printf ("Capture value : %s !\n", &buf);
    }
}

int main(int argc, char *argv[])
{
    int zerr = 0 ;
    int rc = 0 ;
    int rRes;

    // Frontend socket talks to clients over TCP Port
    void *ctx = zmq_ctx_new ();
    void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
    zerr = zmq_bind (frontend, "tcp://10.1.1.31:5555");

    if (zerr != 0)
    {
        printf ("\nFrontend bind error : %s\n", zmq_strerror(errno));
        return 0;
    }

    // Backend socket talks to workers 
    void *backend = zmq_socket (ctx, ZMQ_DEALER);
    zerr = zmq_bind (backend, "tcp://10.1.1.31:6555");

    if (zerr != 0)
    {
        printf ("\nBackend bind error : %s\n", zmq_strerror(errno));
        return 0;
    }

    void *capture = zmq_socket (ctx, ZMQ_DEALER);
    zerr = zmq_bind (capture, "ipc://capture.ipc");

    if (zerr != 0)
    {
        printf ("\nCapture bind error : %s\n", zmq_strerror(errno));
        return 0;
    }

    pthread_t capworker;
    rc = pthread_create(&capworker, NULL, proxy_capture, ctx);

    zmq_proxy (frontend, backend, capture);

    while (1) {
        printf ("Broker loop …\n");
        sleep(1);
    }

    sleep(1);
    zmq_ctx_destroy (&ctx);
    printf ("\nEnd server…\n");

    return 0;
 }

TestWorker

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <zmq.h>
#include <unistd.h>
#include <assert.h>

#define ZFRAME_MORE     1
#define ZFRAME_REUSE    2
#define ZFRAME_DONTWAIT 4

int main(int argc, char *argv[])
{
    int zerr = 0 ;
    int rc = 0 ;
    int rRes;

    void *ctx = zmq_ctx_new ();
    //void *worker = zmq_socket (ctx, ZMQ_DEALER);
    void *worker = zmq_socket (ctx, ZMQ_REP);
    zerr = zmq_connect (worker, "tcp://10.1.1.31:6555");

    sleep(1);

    if (zerr != 0)
    {
        printf ("Worker connect error : %s\n", zmq_strerror(errno));
        return 0;
    }

    while (1) {
        char buf [256];
        rc = zmq_recv (worker, buf, 256, 0); 
        assert (rc != -1);
        printf ("Received : %s !\n", &buf);
        printf ("Responding to Client... !\n");

        rc = zmq_send(worker, "WORLD", 6, 0);

        if (rc > 0) {
            printf ("Success : Sent size ... %d!\n",rc);
            //break;
        } else {
            printf("Error: %s\n", zmq_strerror(errno)); 
        }
    }
    zmq_close (worker);
    zmq_ctx_destroy (ctx);

    return 0;
 }

Non working output (client DEALER and worker DEALER)

TestClient

Connecting to hello world broker...
Sending Request : HELLO
Success : Sent size ... 6!
Wait for response ..

TestBroker

Capture value : !
Capture value : HELLO ! <-- Req from client Capture
value : WORLD ! <-- Resp from worker
Capture value : WORLD !

TestWorker

Received : !
Responding to Client... !
Success : Sent size ... 6!
Received : HELLO !
Responding to Client... !
Success : Sent size ...
6!

So it appears the worker responds, but that the response is lost or incorrectly directed to the client by the router ?

Tks for any assistance

1

1 Answers

0
votes

I stumbled upon your answer for my question for the zmq_proxy on the capture part which I'm trying to understand.

For your question, you wanted client to be asynchronous and instead of using REQ, you used DEALER.

The below is how I make it asynchronous. My zeromq version is 4.2.1.

        //  Socket to talk to server
       void *context = zmq_ctx_new();

        void *requester = zmq_socket (context, ZMQ_REQ);
        int timeout = 5000; //Timeout of 5 seconds to make sure not having it hang either while sending or receving...
        int linger = 0;
        zmq_setsockopt (requester, ZMQ_LINGER, &linger, sizeof(int));
        zmq_setsockopt (requester, ZMQ_SNDTIMEO, &timeout, sizeof(int));
        zmq_setsockopt (requester, ZMQ_RCVTIMEO, &timeout, sizeof(int));
        int connection_status = zmq_connect (requester, "tcp://localhost:5559");

You can either use linger by setting it to 0 or you can use the flag ZMQ_DONTWAIT on sending over to the proxy from REQ