Hot questions for Using ZeroMQ in asynchronous

Top 10 C/C++ Open Source / ZeroMQ / asynchronous

Question:

I'm using zmq to develop a distributed application having the following network topology: a client node that initiates a request and a server node that replies to requests. Since the client is a node.js application I can't block after a send call to wait the response, so the scenario is that the client could emit multiple send calls to the same endpoint. On the other side the server is a mobile application that processes one request a time in one thread, blocking if there are not any requests. If this configuration sounds odd, I'm trying to build a sort of RPC initiated by the server to mobile.

I thought to use a DEALER socket client side and a REP socket server side. From zmq guide about DEALER/REP combination:

This gives us an asynchronous client that can talk to multiple REP servers. If we rewrote the "Hello World" client using DEALER, we'd be able to send off any number of "Hello" requests without waiting for replies.

Can it be applied to asynchronous client that can talk to one single server? And could it be a good choice? If not which pattern should I use?


Answer:

Can it be applied to asynchronous client that can talk to one single server? And could it be a good choice?

  1. REQ/REP is not recommended for traffic going over the Internet. The socket can potentially get stuck in a bad state.
  2. The DEALER/REP is for a dealer client talking to multiple REP server. So this does not apply for your use case.

If not which pattern should I use?

In your case it seems to me that using the traditional DEALER/ROUTER is the way to go. What I usually do is prepend my messages by a "tag frame", ie a frame that contain an UUID of some sort that allows me to identifies my request (and their reply) at the application level.

Question:

I am currently working on a project that recquire fast network management. To do so I choosed 0MQ, but after reading the documentation and example given by this one. There is something I hardly understand concerning the asynchronous part of 0MQ.

Is there any thread created for each request on a ROUTER or DEALER socket ?

I often do the mistake to combine asynchronous and multi-threaded. When I look at the man of zmqsocket I see that for a DEALER or ROUTER socket, the incoming routing is setted at "Fair-queued". From this I conclude asynchronous means you can write or read on the socket without waiting for an answer to send another request (everything is queued and process synchronously).

So here is the question,

Is there any thread created by 0MQ concerning each request ? (I am not talking about the background thread 0MQ use internally to manage message queueing).


Answer:

Zeromq creates only one thread. No additional thread is created for request or a socket.

The background thread does all the work and the user thread communicate with the background threads using queues and file descriptors.

The background thread is using epoll or kqueue to do the asynchronous magic.

You can actually control the amount of background threads, but usually it is one.

Question:

This may appear as a silly question, but I am really confused about the terminology of the ZeroMQ regarding synchronous sockets like REQ and REP.

By my understanding a synchronous communication occurs when a client sends a message an then it blocks, until the response arrives. If ZeroMQ implemented a synchronous communication then only a .send() method would be enough for a synchronous socket.

I think that synchronous sockets terminology of ZeroMQ refers only to the inability of sending more messages until the response of the last message arrives, but the "sender" can still continue its processing ( doing more stuff ) asynchronously.

Is this true?

In that case, is there any straightforward way to implement a synchronous communication using ZeroMQ?

EDIT: Synchronous communication makes sense when I want to invoke a method in a remote process (like RPC). If I want to execute a series of commands in a remote process and each command needs the result of the previous one to do its job then asynchronous communication is not the best option.


Answer:

To use ZMQ for implementing a synchronous framework, you can very nearly do it using just ZMQ; you can set the high water mark to 1. Unfortunately that's not quite it; what you want is an out going queue length of 0. Even more unfortunately, setting the high water mark to 0 is interpretted by ZMQ as infinity...

So the only option is to implement a synchronous transfer protocol on top of ZMQ. That's not very difficult to do. The conversation between the two ends will be something like "can I send?", "yes you can send now", "ok here it is", "ok I have received it" (both ends return to caller) (or at least the programatic version of that). This sets up what is called an execution rendevous - both ends know that they both reached a certain point of execution.

Technically speaking what you're doing is taking ZeroMQ (Actor Model) and turning it into something more like Communicating Sequential Processes.

RPC

Having said all that, from your edit I think you might like to consider Cap'n Proto. This is a C++ serialisation technology that has a neat RPC trick. If the return from one RPC call is the input to another, you can chain those all together somehow in advance (see here).

Question:

I'm learning ZeroMQ and just went through the tutorial and a few examples. I'm using Node.js as my main environment ( with Python eventually used to replace my workers ).

Trying to sort out how I can create a fully asynchronous messaging system that will allow my API to push tasks ( via a REQ socket ) to a router, have a dealer pass the message to a worker, process the message and send its results back up to my client ( which is an Express route ).

I believe the pattern for this would work something like this ( haven't tested or properly implemented code yet, so please take it as a conceptual outline ):


router.js

const zmq = require('zmq');;
const frontend = zmq.socket('router');
const backend = zmq.socket('dealer');

frontend.on('message', function() {
  var args = Array.apply(null, arguments);
  backend.send(args);
});
backend.on('message', function() {
  var args = Array.apply(null, arguments);
  frontend.send(args);
});

frontend.bindSync('tcp://*:5559');
backend.bindSync('tcp://*:5560');

client.js

var zmq = require('zmq'),
var express = require('express');
var app = express();

app.post('send', function(req, res) {
  var client = zmq.socket('req');
  // listen for responses from the server 
  client.on('message', function(data) {  
     console.log(data);
     client.close();
  }); 
  // connect to the server port 
  client.connect('tcp://0.0.0.0:5454');  
  client.send('Request from ' + process.id);
});
app.listen('80');

worker.js

var zmq = require('zmq');
var server = zmq.socket('rep');

server.on('message', function(d){  
  server.send('Response from ' + process.id); 
}); 
// bind to port 5454 
server.bind('tcp://0.0.0.0:5454', function(err){  
  if (err){ 
    console.error("something bad happened"); 
    console.error( err.msg ); 
    console.error( err.stack ); 
    process.exit(0); 
  } 
});

What I'm not fully understanding is if the ROUTER/DEALER will handle sending the response worker to the correct client. Also in this case the Dealer handles the Fair Queueing as I want my work distributed amongst the workers evenly.

My client could be distributed amongst many different boxes ( load balancer API server ), my router will be on its own server and the workers would be distributed amongst multiple boxes as well.


Answer:

Forget REQ/REP in any production-grade app, can fall in mutual deadlock

You might find this subject in many other posts on high-risk mutual FSM-FSM deadlocking in REQ/REP Formal Scalable Communication Pattern.


Be sure, XREQ/XREP == DEALER/ROUTER ( already since 2011 )

source code removes all hidden magics behind this, XREQ == DEALER and XREP == ROUTER

+++b/include/zmq.h
...
-#define ZMQ_XREQ 5
-#define ZMQ_XREP 6
+#define ZMQ_DEALER 5
+#define ZMQ_ROUTER 6
...
+#define ZMQ_XREQ ZMQ_DEALER        /*  Old alias, remove in 3.x     */
+#define ZMQ_XREP ZMQ_ROUTER        /*  Old alias, remove in 3.x     */

Question:

I have never used ZeroMQ and first heard of it an hour ago. But from the guide (this guide) it sounds like there are async I/O.

It also happens that there is a nim port : this one

So I was wondering, does the async magic has something to do with async/await which are keywords not present in the nim port (which is just c2nim). So is it just something that's internal to ZMQ and the API doesn't have to bother about it ?

I thought async/await was a vernacular thing that has to bubble up to the upper most main loop (framework loop) so the API would have to be async-aware.

Is this a complete misconception on my part ?


Answer:

Native ZeroMQ API supports both blocking and non-blocking I/O-s.

For this purpose, there are flags, where zmq.NOBLOCK could be added, so as to achieve a non-blocking mode of operation.


The respective language-wrapper functionality decides . . .

If I read the nim ZeroMQ-wrapper, that you have mentioned above, it seems to me, that there is a hardcoded blocking version for both send() and recv() function-wrappers.

The wrapper also seems not to support correct wireline message sizing in case a nim-based node of a distributed-system meets another node, which is using ZeroMQ version 2.1.+, which is still interesting and common in heterogeneous distributed-system realms.

ZeroMQ has also a poll() method, equipped with a timeout parameter, so that your multiplexed I/O-operations may yield all wanted ways of how to operate multiple I/O-channels under some soft real-time control constraints.

Question:

After installing zmq and czmq with brew, I tried to compile and play the Asynchronous-Majordomo-Pattern but it did not work as it requires czmq v3. As far as I understood, I tried to update it to the v4, using zactor because

zthread is deprecated in favor of zactor http://czmq.zeromq.org/czmq3-0:zthread

So right now the following code looks fine to me as updated async-majordomo pattern, but it does not work as expected, It does not create any thread when I run it via my terminal.

//  Round-trip demonstrator
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. The client task signals to
//  main when it's ready.

#include "czmq.h"
#include <stdlib.h>

void dbg_write_in_file(char * txt, int nb_request) {
    FILE * pFile;
    pFile = fopen ("myfile.txt","a");

    if (pFile!=NULL)
    {
        fputs (txt, pFile);

        char str_nb_request[12];
        sprintf(str_nb_request, "%d", nb_request);
        fputs (str_nb_request, pFile);

        fputs ("\n", pFile);
        fclose (pFile);
    }
}

static void
client_task (zsock_t *pipe, void *args)
{
    zsock_t *client = zsock_new (ZMQ_DEALER);
    zsock_connect (client, "tcp://localhost:5555");
    printf ("Setting up test...\n");
    zclock_sleep (100);

    printf("child 1: parent: %i\n\n", getppid());
    printf("child 1: my pid: %i\n\n", getpid());

    int requests;
    int64_t start;

    printf ("Synchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {
        zstr_send (client, "hello");

        // stuck here /!\

        char *reply = zstr_recv (client);
        zstr_free (&reply);

        // check if it does something
        dbg_write_in_file("sync round-trip requests : ", requests);
        // end check
    }
    printf (" %d calls/second\n",
        (1000 * 10000) / (int) (zclock_time () - start));

    printf ("Asynchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++) {
        zstr_send (client, "hello");

        // check if it does something
        dbg_write_in_file("async round-trip send requests : ", requests);
        // end check
    }
    for (requests = 0; requests < 100000; requests++) {
        char *reply = zstr_recv (client);
        zstr_free (&reply);

        // check if it does something
        dbg_write_in_file("async round-trip rec requests : ", requests);
        // end check
    }
    printf (" %d calls/second\n",
        (1000 * 100000) / (int) (zclock_time () - start));

    zstr_send (pipe, "done");
}

//  Here is the worker task. All it does is receive a message, and
//  bounce it back the way it came:

static void
worker_task (zsock_t *pipe, void *args)
{
    printf("child 2: parent: %i\n\n", getppid());
    printf("child 2: my pid: %i\n\n", getpid());

    zsock_t *worker = zsock_new (ZMQ_DEALER);
    zsock_connect (worker, "tcp://localhost:5556");

    while (true) {
        zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);    
    }
    zsock_destroy (&worker);
}

//  Here is the broker task. It uses the zmq_proxy function to switch
//  messages between frontend and backend:

static void
broker_task (zsock_t *pipe, void *args)
{
    printf("child 3: parent: %i\n\n", getppid());
    printf("child 3: my pid: %i\n\n", getpid());

    //  Prepare our sockets
    zsock_t *frontend = zsock_new (ZMQ_DEALER);
    zsock_bind (frontend, "tcp://localhost:5555");
    zsock_t *backend = zsock_new (ZMQ_DEALER);
    zsock_bind (backend, "tcp://localhost:5556");
    zmq_proxy (frontend, backend, NULL);

    zsock_destroy (&frontend);
    zsock_destroy (&backend);
}

//  Finally, here's the main task, which starts the client, worker, and
//  broker, and then runs until the client signals it to stop:

int main (void)
{
    //  Create threads
    zactor_t *client = zactor_new (client_task, NULL);
    assert (client);    
    zactor_t *worker = zactor_new (worker_task, NULL);
    assert (worker);
    zactor_t *broker = zactor_new (broker_task, NULL);
    assert (broker);

    //  Wait for signal on client pipe
    char *signal = zstr_recv (client);
    zstr_free (&signal);

    zactor_destroy (&client);
    zactor_destroy (&worker);
    zactor_destroy (&broker);
    return 0;
}

When I run it, it looks like the program is stuck at the comment

// stuck here /!\

Then when I kill it as it does not finish, or print anything at all, I need to press five time Ctrl+C ( ^C ). Only then, it looks more verbose on the console, like it was indeed running. => Note that I delete all my printf() steps' outputs, as it was really messy to read.

When it runs, it does not write anything to the file, called by the dbg_write_in_file() function, only after sending five Ctrl+C ( ^C ).

Both client worker and broker task return the same getppid number ( my terminal ) and getpid as the program itself.

I use gcc trippingv4.c -o trippingv4 -L/usr/local/lib -lzmq -lczmq to compile.

When I try to kill it :

./trippingv4
Setting up test...
child 1: parent: 60967

child 1: my pid: 76853

Synchronous round-trip test...
^Cchild 2: parent: 60967

child 2: my pid: 76853

^Cchild 3: parent: 60967

child 3: my pid: 76853

^C^C^CE: 18-02-28 00:16:37 [76853]dangling 'PAIR' socket created at src/zsys.c:471
E: 18-02-28 00:16:37 [76853]dangling 'DEALER' socket created at trippingv4.c:29
E: 18-02-28 00:16:37 [76853]dangling 'PAIR' socket created at src/zsys.c:471
E: 18-02-28 00:16:37 [76853]dangling 'DEALER' socket created at trippingv4.c:89

Update

Thanks for the detailed answer @user3666197. In first part, the compiler does not compile the assert call so I just show the value instead and compare visually, they are the same.

int czmqMAJOR,
czmqMINOR,
czmqPATCH;

zsys_version ( &czmqMAJOR, &czmqMINOR, &czmqPATCH );
printf( "INF: detected CZMQ ( %d, %d, %d ) -version\n",
         czmqMAJOR,
         czmqMINOR,
         czmqPATCH
         );

printf( "INF: CZMQ_VERSION_MAJOR %d, CZMQ_VERSION_MINOR %d, CZMQ_VERSION_PATCH %d\n",
         CZMQ_VERSION_MAJOR,
         CZMQ_VERSION_MINOR,
         CZMQ_VERSION_PATCH
         );

Output :

INF: detected CZMQ ( 4, 1, 0 ) -version
INF: CZMQ_VERSION_MAJOR 4, CZMQ_VERSION_MINOR 1, CZMQ_VERSION_PATCH 0

The zsys_info call does compile but does not show anything on the terminal, even with a fflush(stdout) just in case so I just used printf :

INF: This system's Context() limit is 65535 ZeroMQ socketsINF: current state of the global Context()-instance has:
     ( 1 )-IO-threads ready
     ( 1 )-ZMQ_BLOCKY state

Then I changed the global context thread value with zsys_set_io_threads(2) and/or zmq_ctx_set (aGlobalCONTEXT, ZMQ_BLOCKY, false);, still blocked. It looks like zactor does not works with systems threads as zthread was... or does not gives a similar behavior. Given my experience in zeromq (also zero) probably I trying something that can't be achieved.

Update solved but unproper

My main error was to not have properly initiate zactor instance

An actor function MUST call zsock_signal (pipe) when initialized and MUST listen to pipe and exit on $TERM command.

And to not have blocked the zactor's proxy execution before it called zactor_destroy (&proxy);

I let the final code below but you still need to exit at the end with Ctrl+C because I did not figure it out how to manage $TERM signal properly. Also, zactor still appears to not use system theads. It's probably design like this but I don't know how it's work behind the wood.

//  Round-trip demonstrator
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. The client task signals to
//  main when it's ready.

#include <czmq.h>

static void
client_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Client"));
    zsock_signal (pipe, 0);

    zsock_t *client = zsock_new (ZMQ_DEALER);
    zsock_connect (client, "tcp://127.0.0.1:5555");

    printf ("Setting up test...\n");
    zclock_sleep (100);

    int requests;
    int64_t start;

    printf ("Synchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {
        zstr_send (client, "hello");

        zmsg_t *msgh = zmsg_recv (client);
        zmsg_destroy (&msgh);

    }
    printf (" %d calls/second\n",
        (1000 * 10000) / (int) (zclock_time () - start));

    printf ("Asynchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++) {
        zstr_send (client, "hello");
    }
    for (requests = 0; requests < 100000; requests++) {
        char *reply = zstr_recv (client);
        zstr_free (&reply);
    }
    printf (" %d calls/second\n",
        (1000 * 100000) / (int) (zclock_time () - start));

    zstr_send (pipe, "done");
    printf("send 'done' to pipe\n");
}

//  Here is the worker task. All it does is receive a message, and
//  bounce it back the way it came:

static void
worker_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Worker"));
    zsock_signal (pipe, 0);

    zsock_t *worker = zsock_new (ZMQ_DEALER);
    zsock_connect (worker, "tcp://127.0.0.1:5556");

    bool terminated = false;
    while (!terminated) {
        zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);
        // zstr_send (worker, "hello back"); // Give better perf I don't know why

    }
    zsock_destroy (&worker);
}

//  Here is the broker task. It uses the zmq_proxy function to switch
//  messages between frontend and backend:

static void
broker_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Task"));
    zsock_signal (pipe, 0);

    //  Prepare our proxy and its sockets
    zactor_t *proxy = zactor_new (zproxy, NULL);
    zstr_sendx (proxy, "FRONTEND", "DEALER", "tcp://127.0.0.1:5555", NULL);
    zsock_wait (proxy);
    zstr_sendx (proxy, "BACKEND", "DEALER", "tcp://127.0.0.1:5556", NULL);
    zsock_wait (proxy);

    bool terminated = false;
    while (!terminated) {
        zmsg_t *msg = zmsg_recv (pipe);
        if (!msg)
            break;              //  Interrupted
        char *command = zmsg_popstr (msg);

        if (streq (command, "$TERM")) {
            terminated = true;
            printf("broker received $TERM\n");
        }

        freen (command);
        zmsg_destroy (&msg);
    }

    zactor_destroy (&proxy);
}

//  Finally, here's the main task, which starts the client, worker, and
//  broker, and then runs until the client signals it to stop:

int main (void)
{

    //  Create threads
    zactor_t *client = zactor_new (client_task, "Hello, Client");
    assert (client);
    zactor_t *worker = zactor_new (worker_task, "Hello, Worker");
    assert (worker);
    zactor_t *broker = zactor_new (broker_task, "Hello, Task");
    assert (broker);

    char *signal = zstr_recv (client);
    printf("signal %s\n", signal);
    zstr_free (&signal);

    zactor_destroy (&client);
    printf("client done\n");
    zactor_destroy (&worker);
    printf("worker done\n");
    zactor_destroy (&broker);
    printf("broker done\n");

    return 0;
}

Answer:

Let's diagnose the as-is state, going step by step:
int czmqMAJOR,
    czmqMINOR,
    czmqPATCH;

zsys_version ( &czmqMAJOR, &czmqMINOR, &czmqPATCH );
printf( "INF: detected CZMQ( %d, %d, %d )-version",
         czmqMAJOR,
         czmqMINOR,
         czmqPATCH
         );
assert ( czmqMAJOR == CZMQ_VERSION_MAJOR & "Major: does not match\n" );
assert ( czmqMINOR == CZMQ_VERSION_MINOR & "Minor: does not match\n" );
assert ( czmqPATCH == CZMQ_VERSION_PATCH & "Patch: does not match\n" );

if this matches your expectations, you may hope the DLL-versions are both matching and found in proper locations.


Next:

may test the whole circus run in a non-blocking mode, to prove, there is no other blocker, but as briefly inspected, I have not found such option exposed in CZMQ-API, the native API allows one to flag a NOBLOCK option on { _send() | _recv() }-operations, which prevents them from remaining blocked ( which may be the case for DEALER socket instance in cases on _send()-s, when there are not yet any counterparty with a POSACK-ed .bind()/.connect() state ).

Here I did not find some tools to do this as fast as expected in native API. Maybe you will have more luck on going through this.


Test the presence of a global Context() instance, if it is ready:

add before a first socket instantiation, to be sure we are before any and all socket-generation and their respective _bind()/_connect() operation a following self-reporting row, using:

 zsys_info ( "INF: This system's Context() limit is %zu ZeroMQ sockets",
              zsys_socket_limit ()
              );

One may also enforce the Context() instantiation manually:

so as to be sure the global Context() instance is up and running, before any higher abstracted instances ask if for implementing additional internalities ( sockets, counters, handlers, port-management, etc. )

//  Initialize CZMQ zsys layer; this happens automatically when you create
//  a socket or an actor; however this call lets you force initialization
//  earlier, so e.g. logging is properly set-up before you start working.
//  Not threadsafe, so call only from main thread. Safe to call multiple
//  times. Returns global CZMQ context.
CZMQ_EXPORT void *
    zsys_init (void);

//  Optionally shut down the CZMQ zsys layer; this normally happens automatically
//  when the process exits; however this call lets you force a shutdown
//  earlier, avoiding any potential problems with atexit() ordering, especially
//  with Windows dlls.
CZMQ_EXPORT void
    zsys_shutdown (void);

and possibly better tune IO-performance, using this right at the initialisation state:

//  Configure the number of I/O threads that ZeroMQ will use. A good
//  rule of thumb is one thread per gigabit of traffic in or out. The
//  default is 1, sufficient for most applications. If the environment
//  variable ZSYS_IO_THREADS is defined, that provides the default.
//  Note that this method is valid only before any socket is created.
CZMQ_EXPORT void
    zsys_set_io_threads (size_t io_threads);

This manual instantiation gives one an additional benefit, from having the instance-handle void pointer, so that one can inspect it's current state and shape by zmq_ctx_get() tools:

void *aGlobalCONTEXT = zsys_init();

printf( "INF: current state of the global Context()-instance has:\n" );
printf( "     ( %d )-IO-threads ready\n", zmq_ctx_get( aGlobalCONTEXT,
                                                       ZMQ_IO_THREADS
                                                       )
        );
printf( "     ( %d )-ZMQ_BLOCKY state\n", zmq_ctx_get( aGlobalCONTEXT,
                                                       ZMQ_BLOCKY
                                                       )
        ); // may generate -1 in case DLL is << 4.2+
...


If unhappy with signal-handling, one may design and use another one:
//  Set interrupt handler; this saves the default handlers so that a
//  zsys_handler_reset () can restore them. If you call this multiple times
//  then the last handler will take affect. If handler_fn is NULL, disables
//  default SIGINT/SIGTERM handling in CZMQ.
CZMQ_EXPORT void
    zsys_handler_set (zsys_handler_fn *handler_fn);

where

//  Callback for interrupt signal handler
typedef void (zsys_handler_fn) (int signal_value);

Question:

I would like to utilize ZMQ to implement (in python) a broker and a client that handles request-reply to addressed entities asynchronously. The client contains functionality for doing both requests and replies (only thing missing is the exact socket-type/pattern).

The request can be blocking, but the reply-side needs to be able to handle parallel (threaded) requests as they come in. (ie REP-socket is not good enough since it requires a send before the next receive)

It needs to go through a broker since there will be many possible entities who can do requests and replies and I only want to bind a set number of ports (not one per entity).

Entity1                Broker                    Entity2
  REQ ------------- ROUTER ?????? -------------- ??????

Entity1 will know the ID of Entity2 and use that to make sure the request is made to Entity2 specifically. There can be any number of entities, but all entities that should answer to requests will register IDs.

I've tried with DEALER on the right side of the broker above, but that one will only send requests round-robin it seems.

So do anyone know a good pattern/set of sockets I could I use in order to address a specific entity asynchronously?

Summary:

  • Blocking on the request-side
  • Broker/Proxy for binding a fixed number of ports
  • The replying socket should be specifically addressed by the requester
  • Threaded replies (Reply-side can receive and handle parallel requests)

I've been reading the ZMQ-manual quite extensively but I haven't found any real good pattern yet for addressing specific sockets through a broker, so any help is greatly appreciated.


Answer:

After some further research and testing I found a pattern that seems to provide a solution to all my demands.

Pattern

Requester               Broker                   Replier
  REQ ------------- ROUTER ROUTER -------------- DEALER
                (requests) (replies)

Requester

The request side of the client simply connects to the request router on the broker, sends a request and starts reading the socket for a reply:

reqSocket.connect(self._reqAddress)
reqSocket.send_multipart([repId, message])
reply = reqSocket.recv_multipart()[0]

The replier ID is included as first part of the message, for example:

Outgoing message: ['replierId', 'requestMsg']

Request Router

if self.reqRouterSocket in socketEvents:
    multipart = self.reqRouterSocket.recv_multipart()
    multipart = [multipart[-2]] + multipart
    del multipart[-2]
    self.repRouterSocket.send_multipart(multipart)

Ie, the request router just moves the first part of the payload (being the replierId) and puts it first in the address stack:

Incoming message: ['reqSocketAddr', '', 'replierId', 'requestMsg']
Outgoing message: ['replierId', 'reqSocketAddr', '', 'requestMsg']

The outgoing message is sent from the replier router. Since the replier has it's socket id set to 'replierId' and has connected to the replier router, that router recognizes this address and is able to deliver the request successfully.

Replier

The replier needs to set it's own socket identity to some known value in order to be directly addressed as described above.

NOTE: You have to set the socket id of the DEALER socket BEFORE you perform the connect to the reply router. To set the identity of the socket:

self.dealerSocket.setsockopt(zmq.IDENTITY, 'replierId')

Else the router won't know the id and will throw the messages.

The replier listens for incoming requests. In my case this is all threaded and requests are handled asynchronously. That is the reason for using the DEALER socket instead of a regular REP, which in the synchronous case would be much easier. The DEALER socket can receive further requests without having to answer the first one first, which the REP has to. A simplified version of what is done on the replier side however is:

multipart = self.dealerSocket.recv_multipart()
returnRoute = multipart[:-1]
requestMsg = multipart[-1]
reply = someFunction(requestMsg)
self.dealerSocket.send_multipart(returnRoute + [reply])

Ie, the replier just returns what it got, but with the request changed for a reply instead:

Incoming message: ['replierId', 'reqSocketAddr', '', 'request']
Outgoing message: ['replierId', 'reqSocketAddr', '', 'reply']

This outgoing message is then sent back to the reply router.

Reply Router

A router is chosen on this side of the broker purely because of the fact that it needs functionality to address a specific socket among many connected ones.

if self.repRouterSocket in socketEvents:
    multipart = self.repRouterSocket.recv_multipart()
    self.reqRouterSocket.send_multipart(multipart[1:])

Ie, just pop the first address of the address stack and send the message over to the requesting side again.

Incoming message: ['replierId', 'reqSocketAddr', '', 'reply']
Outgoing message: ['reqSocketAddr', '', 'reply']

The request router recognizes this address and sends the request back to the requester which receives:

Incoming list: ['reply']

This pattern seem to fulfill the requirements I made in my question. I hope it can be of use for others as well.

Question:

I'm using czmq for interprocess communication.

There are 2 processes :

  • The server, receiving requests and sending replies but also sending events.
  • The client, sending requests and receiving replies but also listening to the events.

I have already successfuly implemented the "request/reply" pattern with REQ/REP (details below)

Now I want to implement the notification mechanism. I want my server to send its events without caring whether anyone receives them or not and without being blocked in anyway. The client listens to those events but should it crash, it mustn't have any impact on the server. I believe PUB/SUB is the most appropriate pattern, but if not do not hesitate to enlighten me.

Here's my implementation (cleaned from checks and logs) :

The server publishes the events

Server::eventIpcPublisher = zsock_new_pub("@ipc:///tmp/events.ipc");

void Server::OnEvent(uint8_t8* eventData, size_t dataSize) {
  if (Server::eventIpcPublisher != nullptr) {
    int retCode = zsock_send(Server::eventIpcPublisher, "b", eventData, dataSize);
}

The client listens to them in a dedicated thread

void Client::RegisterToEvents(const std::function<void(uint8_t*, size_t)>& callback) {
  zsock_t* eventIpcSubscriber = zsock_new_sub(">ipc:///tmp/events.ipc", "");
  listening = true;
  while (listening) {
    byte* receptionBuffer;
    size_t receptionBufferSize;
    int retCode = zsock_recv(eventIpcSubscriber, "b", &receptionBuffer, &receptionBufferSize);
    --> NEVER REACHED <--
    if (retCode == 0) {
      callback(static_cast<uint8_t*>(receptionBuffer), receptionBufferSize);
    }
  }
  zsock_destroy(&eventIpcSubscriber);
}

It doesn't work:

  • The server sends with return code 0, as if everything is ok,
  • The client doesn't receive anything (blocked on receive).

Help would be much appreciated, thanks in advance!

Chris.

PS: here is the REQ/REP that I have already implemented with success (no help needed here, just for comprehension)

The client sends a request and then waits for the answer.

uint8_t* MulticamApi::GetDatabase(size_t& sizeOfData) {
  zsock_t* requestSocket = zsock_new_req(">ipc:///tmp/requests.ipc");
  if (requestSocket == nullptr)
    return nullptr;
  byte* receptionBuffer;
  size_t receptionBufferSize;
  int retCode = zsock_send(requestSocket, "i", static_cast<int>(IpcComm_GetClipDbRequest));
  if (retCode != 0) {
    sizeOfData = 0;
    return nullptr;
  }
  retCode = zsock_recv(requestSocket, "b", &receptionBuffer, &receptionBufferSize);
  databaseData.reset(new MallocGuard(static_cast<void*>(receptionBuffer)));
  sizeOfData = receptionBufferSize;
  return static_cast<uint8_t*>(databaseData->Data());
}

A dedicated thread in the server listens to requests, processes them and replies. (don't worry, delete is handled somewhere else)

U32 Server::V_OnProcessing(U32 waitCode) {
  protocolIpcWriter = zsock_new_rep("@ipc:///tmp/requests.ipc");
  while (running) {
    int receptionInt = 0;
    int retCode = zsock_recv(protocolIpcWriter, "i", &receptionInt);
    if ((retCode == 0) && (receptionInt == static_cast<int>(IpcComm_GetClipDbRequest))) {
      GetDatabase();
    }
    sleep(1);
  }
  zsock_destroy(&protocolIpcWriter);
  return 0;
}

void Server::GetDatabase() {
  uint32_t dataSize = 10820 * 340;
  uint8_t* data = new uint8_t[dataSize];
  uint32_t nbBytesWritten = DbHelper::SaveDbToBuffer(data, dataSize);
  int retCode = zsock_send(protocolIpcWriter, "b", data, nbBytesWritten);
}

Answer:

I know my question's old but for the record, I switched from czmq to base zmq api and everything went smooth. A colleague of mine also had issues with the czmq layer and switched to zmq to fix them so that's definitely what I recommend.

Question:

After reading through the ZMQ manual about the load balancing broker, I thought that it would be great to implement in my own code. So I did, adding some additional touches to make it more responsive. One performance enhancement I was looking to add was the ability to dispatch to multiple long-running work jobs concurrently. I think I'm right about this, I could be wrong though, so consider the following with respect to just the lbbroker code that's in the manual:

Two workers (clients) simultaneously request work, each with long running jobs given to them (by a manager, or manager). In the current code, It's good because it's not round-robin-ing the work to different recipients, it's selecting FCFS. But there's also a problem in that a reply is first needed from the first worker who gets through before work can be dispensed to the second worker.

Basically, I want to dole worker out as fast as there are workers ready to receive it, FCFS style and concurrently as well. At the same time, I don't want to lose the model that I have where manager A gets through to worker B, and worker B's reply gets back to manager A. Keeping this, which is facilitated by the request-reply pattern, while at the same time allowing worker B to receive the only manager's second work job while A may still be processing it's job is very desired.

How can I most easily go about achieving this? Preferably by modifying my current lbbroker implementation, which isn't too different from lbbroker in the manual.

Thanks in advance.


Answer:

As it turns out, my difficulties stemmed from an unsufficiently specific understanding of the load balancing broker example; it is not a broker that has REP sockets in order that it must receive between each work request/worker request. So the asynchronous issue does not exist at all.

Basically, a Router has an identity message and in forwarding that along in a consistent manner, you can avoid the issue entirely, and the router is free to connect other manager worker pairs while N concurrent workers work.