I'm trying to get an Asynchronous client/server in C model going using the libzmq C API.
I'm using ZeroMQ 3.2.2 on a Linux platform and the pattern I am trying to get working looks as below:
>Client -> DEALER
>
>Router -> ROUTER
>--- proxy ---
>Dealer -> DEALER
>
>Workers-> DEALER
I need to clients to be non blocking, yet to receive response to messages. From the examples I have seen, my understanding is that using ROUTER-DEALER with zmq_proxy should return the message to the initial client.
However, by attaching a capture socket to the zmq_proxy it appears that the response is not routed back.
When I change the client to REQ and the worker to REP then all works as expected. Any feedback to where I am going wrong or misunderstanding would be welcome.
The 3 components (client, broker and worker) are below.
TestClient
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
int main (void)
{
printf ("Connecting to hello world broker...\n");
void *ctx = zmq_ctx_new ();
void *requester = zmq_socket (ctx, ZMQ_DEALER);
//void *requester = zmq_socket (ctx, ZMQ_REQ);
zmq_connect (requester, "tcp://10.1.1.31:5555");
printf ("Sending Request : HELLO \n");
int rc = zmq_send (requester, "HELLO", 6, 0);
if (rc > 0) {
printf ("Success : Sent size ... %d!\n",rc);
} else {
printf("Error: %s\n", zmq_strerror(errno));
}
printf ("Wait for response ..\n");
char buffer [256];
zmq_recv (requester, buffer, 256, 0);
printf ("Response Received : %s\n",&buffer);
zmq_close (requester);
zmq_ctx_destroy (ctx);
return 0;
}
TestBroker
#define _MULTI_THREADED
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <zmq.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>
#define ZFRAME_MORE 1
#define ZFRAME_REUSE 2
#define ZFRAME_DONTWAIT 4
static void *proxy_capture (void *ctx)
{
int zerr = 0 ;
int rRes;
void *worker = zmq_socket (ctx, ZMQ_DEALER);
zerr = zmq_connect (worker, "ipc://capture.ipc");
if (zerr != 0)
{
printf ("\n-------------- > proxy_capture bind error : %s\n", zmq_strerror(errno));
return 0;
}
while (1) {
char buf [256];
int rc = zmq_recv (worker, buf, 256, 0);
assert (rc != -1);
printf ("Capture value : %s !\n", &buf);
}
}
int main(int argc, char *argv[])
{
int zerr = 0 ;
int rc = 0 ;
int rRes;
// Frontend socket talks to clients over TCP Port
void *ctx = zmq_ctx_new ();
void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
zerr = zmq_bind (frontend, "tcp://10.1.1.31:5555");
if (zerr != 0)
{
printf ("\nFrontend bind error : %s\n", zmq_strerror(errno));
return 0;
}
// Backend socket talks to workers
void *backend = zmq_socket (ctx, ZMQ_DEALER);
zerr = zmq_bind (backend, "tcp://10.1.1.31:6555");
if (zerr != 0)
{
printf ("\nBackend bind error : %s\n", zmq_strerror(errno));
return 0;
}
void *capture = zmq_socket (ctx, ZMQ_DEALER);
zerr = zmq_bind (capture, "ipc://capture.ipc");
if (zerr != 0)
{
printf ("\nCapture bind error : %s\n", zmq_strerror(errno));
return 0;
}
pthread_t capworker;
rc = pthread_create(&capworker, NULL, proxy_capture, ctx);
zmq_proxy (frontend, backend, capture);
while (1) {
printf ("Broker loop …\n");
sleep(1);
}
sleep(1);
zmq_ctx_destroy (&ctx);
printf ("\nEnd server…\n");
return 0;
}
TestWorker
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <zmq.h>
#include <unistd.h>
#include <assert.h>
#define ZFRAME_MORE 1
#define ZFRAME_REUSE 2
#define ZFRAME_DONTWAIT 4
int main(int argc, char *argv[])
{
int zerr = 0 ;
int rc = 0 ;
int rRes;
void *ctx = zmq_ctx_new ();
//void *worker = zmq_socket (ctx, ZMQ_DEALER);
void *worker = zmq_socket (ctx, ZMQ_REP);
zerr = zmq_connect (worker, "tcp://10.1.1.31:6555");
sleep(1);
if (zerr != 0)
{
printf ("Worker connect error : %s\n", zmq_strerror(errno));
return 0;
}
while (1) {
char buf [256];
rc = zmq_recv (worker, buf, 256, 0);
assert (rc != -1);
printf ("Received : %s !\n", &buf);
printf ("Responding to Client... !\n");
rc = zmq_send(worker, "WORLD", 6, 0);
if (rc > 0) {
printf ("Success : Sent size ... %d!\n",rc);
//break;
} else {
printf("Error: %s\n", zmq_strerror(errno));
}
}
zmq_close (worker);
zmq_ctx_destroy (ctx);
return 0;
}
Non working output (client DEALER and worker DEALER)
TestClient
Connecting to hello world broker...
Sending Request : HELLO
Success : Sent size ... 6!
Wait for response ..TestBroker
Capture value : !
Capture value : HELLO ! <-- Req from client Capture
value : WORLD ! <-- Resp from worker
Capture value : WORLD !TestWorker
Received : !
Responding to Client... !
Success : Sent size ... 6!
Received : HELLO !
Responding to Client... !
Success : Sent size ...
6!
So it appears the worker responds, but that the response is lost or incorrectly directed to the client by the router ?
Tks for any assistance