Hot questions for Using ZeroMQ in distributed computing

Top 10 C/C++ Open Source / ZeroMQ / distributed computing

Question:

While reading the zeromq guide, I came across client code which sends 100k requests in a loop, and then receives the reply in a second loop.

#include "../include/mdp.h"
#include <time.h>


int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdp_client_t *session = mdp_client_new ("tcp://localhost:5555", verbose);
    int count;
    for (count = 0; count < 100000; count++) {
        zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        mdp_client_send (session, "echo", &request);
    }
    printf("sent all\n");

    for (count = 0; count < 100000; count++) {
        zmsg_t *reply = mdp_client_recv (session,NULL,NULL);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  Interrupted by Ctrl-C
        printf("reply received:%d\n", count);
    }
    printf ("%d replies received\n", count);
    mdp_client_destroy (&session);
    return 0;
}

I have added a counter to count the number of replies that the worker (test_worker.c) sends to the broker, and another counter in mdp_broker.c to count the number of replies the broker sends to a client. Both of these count up to 100k, but the client is receiving only around 37k replies.

If the number of client requests is set to around 40k, then it receives all the replies. Can someone please tell me why packets are lost when the client sends more than 40k asynchronous requests?

I tried setting the HWM to 100k for the broker socket, but the problem persists:

static broker_t *
s_broker_new (int verbose)
{
    broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
    int64_t hwm = 100000;
    //  Initialize broker state
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
    zmq_setsockopt(self->socket, ZMQ_SNDHWM, &hwm, sizeof(hwm));

    zmq_setsockopt(self->socket, ZMQ_RCVHWM, &hwm, sizeof(hwm));
    self->verbose = verbose;
    self->services = zhash_new ();
    self->workers = zhash_new ();
    self->waiting = zlist_new ();
    self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    return self;
}

Answer:

Without setting the HWM and using the default TCP settings, packet loss was being incurred with just 50k messages.

The following helped to mitigate the packet loss at the broker:

  1. Setting the HWM for the zeromq socket.
  2. Increasing the TCP send/receive buffer size.

This helped only up to a certain point. With two clients, each sending 100k messages, the broker was able to manage fine. But when the number of clients was increased to three, they stopped receiving all the replies.

Finally, what has helped me to ensure no packet loss is to change the design of the client code in the following way:

  1. A client can send upto N messages at once. The client's RCVHWM and broker's SNDHWM should be sufficiently high to hold a total of N messages.
  2. After that, for every reply received by the client, it sends two requests.

Question:

Is it possible for a publisher to publish to multiple clients on the same machine using ZeroMQ? I'd like a set of clients, each of which can make standard Request/Response calls using SocketType.REQ and SocketType.REP, but which can also receive notifications using SocketType.SUB and SocketType.PUB.

I've tried to implement this topology, taken from here, although my version only has one publisher.

Here is my publisher:

public class ZMQServerSmall 
{   
    public static void main(String[] args)
    {
        try (ZContext context = new ZContext()) 
        {           
            ZMQ.Socket rep = context.createSocket(SocketType.REP);
            rep.bind("tcp://*:5555");

            ZMQ.Socket pub = context.createSocket(SocketType.PUB);
            pub.bind("tcp://*:7777");           

            while (!Thread.currentThread().isInterrupted()) 
            {                   
                String req = rep.recvStr(0);
                rep.send(req + " response");

                pub.sendMore("Message header");
                pub.send("Message body");;          
            }
        }
    }
}

Here is my proxy (I included a Listener to try to see what's going on):

public class ZMQForwarderSmall 
{
    public static void main(String[] args) 
    {       
        try 
        (
            ZContext context = new ZContext();   
        )
        {
            ZMQ.Socket frontend = context.createSocket(SocketType.XSUB);            
            frontend.connect("tcp://*:7777");

            ZMQ.Socket backend = context.createSocket(SocketType.XPUB);
            backend.bind("tcp://*:6666");

            IAttachedRunnable runnable = new Listener();
            Socket listener = ZThread.fork(context, runnable);

            ZMQ.proxy(frontend, backend, listener);
        }
        catch (Exception e) 
        {
            System.err.println(e.getMessage());
        } 
    }

    private static class Listener implements IAttachedRunnable 
    {    
        @Override
        public void run(Object[] args, ZContext ctx, Socket pipe) 
        {
            while (true) 
            {
                ZFrame frame = ZFrame.recvFrame(pipe);
                if (frame == null)
                    break; // Interrupted
                System.out.println(frame.toString());
                frame.destroy();
            }
        }
    }
}

Here is my Subscriber:

public class ZMQClientSmall
{   
    public static void main(String[] args) throws IOException
    {
        String input;

        try 
        (
            ZContext context = new ZContext();
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))
        ) 
        { 
            ZMQ.Socket reqSocket = context.createSocket(SocketType.REQ);
            reqSocket.connect("tcp://localhost:5555");

            ZMQ.Socket subSocket = context.createSocket(SocketType.SUB);
            subSocket.connect("tcp://localhost:6666");

            subSocket.subscribe("".getBytes(ZMQ.CHARSET));

            while ((input = stdIn.readLine()) != null)
            {
                reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
                String response = reqSocket.recvStr(0);

                String address = subSocket.recvStr(ZMQ.DONTWAIT);
                String contents = subSocket.recvStr(ZMQ.DONTWAIT);
                System.out.println("Notification received: " + address + " : " + contents);
            }
        }   
    }   
}

Here is the test. I open four terminals; 1 publisher, 1 proxy, and 2 clients. When I make a request in either of the two client terminals, I expect to see a notification in both, but instead I only see the notification in the terminal that made the request. I know that both clients are using the same address (localhost:6666), but I'd hoped that the proxy would solve that problem.

Can anyone see anything obviously wrong here?


Answer:

Q : Is it possible for a publisher to publish to multiple clients on the same machine using ZeroMQ?

Oh sure, it is. No doubts about that.


Check the code. The responsibility of the order-of-execution is there. In distributed-systems this always so.

Once the [Client]-No1 instance gets a plausible .readLine()-ed input it will jump-in:

        while ((input = stdIn.readLine()) != null)
        {
            reqSocket.send(input.getBytes(ZMQ.CHARSET), 0);
            String response = reqSocket.recvStr(0);

            String address = subSocket.recvStr(ZMQ.DONTWAIT);
            String contents = subSocket.recvStr(ZMQ.DONTWAIT);
            System.out.println(           "Notification received: "
                              + address + " : "
                              + contents
                                );
        }

Next it .send()-s over REQ and blocks (awaiting REP response)

Given the [Client]-No2 instance also gets a plausible manual .readLine()-ed input it will jump-in the same while(){...}, yet it will not proceed any farther than into again blocking wait for REP-response. That will not get .recv()-ed any time but after the -No1 got served from the REP-side, so while the -No1 might have gotten out of the blocking-.recv(), not so the -No2 ( which will still hang inside the its blocking-.recv() for any next REP-side response ( which may come but need not ), while the No1 has already proceeded to the PUB/SUB-.recv(), which it will receive ( but never the No2 ), next rushing into the next blocking-input-feed from .readLine() Et Cetera, Et Cetera, Et Cetera, ..., Ad Infinitum

So, these SEQ-of-In-Loop (REQ)-parts followed by (SUB)-parts in whatever number N > 1 of [Client]-instances, have effectively generated an EXCLUSIVE Tick-Tock-Tick-Tock clock-machine, mutually blocking an exclusive delivery of the PUB-ed in an N-interleaved order ( not speaking about the manual, .readLine()-driven, blocking step )

The ZMQServerSmall is not aware about anything wrong, as it .send()-s in-order to any .recvStr()-ed counterparty over REQ/REP and PUB-s to all counterparties ( that do not read autonomously, but only after 've been manually .readLine() unblocked and only then ( after REQ/REP episodic ( potentially infinitely blocked ) steps ) may .recv() its next ( so far not read message-part ( yet, there I do not see any code that works with explicitly handling the presence / absence of the multipart-flags on the SUB-side .recv() operations )

        while (!Thread.currentThread().isInterrupted()) 
        {                   
            String req = rep.recvStr(0);
            rep.send(req + " response");

            pub.sendMore("Message header");
            pub.send("Message body");;          
        }

The ZMQServerSmall in the meantime sends ( N - 1 )-times more messages down the PUB-broadcast lane, so the Tick-Tock-Tick-Tock MUTEX REQ/SUB-loop-blocking "pendulum" is not 2-State, but N-State on the receiving sides ( all receive the same flow of PUB-ed messages,yet interleaved by the N-steps of the REQ/REP MUTEX-stepping )

Question:

Struggling to properly shut down a simple pyzmq based client when the server is not available. Below are 2 snippets.

First the server. This is more or less the pyzmq example. No special code here:

import zmq
import json

port = 5555

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:{0}".format(port))

while True:
    message = socket.recv_json()
    print(json.dumps(message))
    socket.send_json({'response': 'Hello'})

Next the client.

import zmq

ip = 'localhost'
port = 5555
addr ="tcp://{0}:{1}".format(ip, port)
message = {'value': 10}

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(addr)

if socket.poll(timeout=1000, flags=zmq.POLLOUT) != 0:
    socket.send_json(message, flags=zmq.NOBLOCK)        
    if socket.poll(timeout=1000, flags=zmq.POLLIN) != 0:
        response = socket.recv_json()
        print(response)

socket.disconnect(addr)
socket.close(linger=0)
context.term()

Here I've tried to enhance the default client with the ability to timeout if the server is not available. The code below is using the poll method, although I've also tried with setting a receive timeout on the socket.

If the server is running, the client sends and receives a response and exits cleanly.

If the server is not running, the client passes immediately through the first socket.poll call (since zmq just buffers the message internally). It blocks for 1 second on the second socket.poll call and correctly skips the recv_json block. It then hangs on the context.term() call. My understanding, from searching is that this will hang if there are sockets that have not been closed, which doesn't seem to be the case.

Any help is much appreciated.


Answer:

On "ability to timeout if the server is not available"

Timeout is possible, yet that will not allow the hard-wired REQ/REP two-step dance to survive, the less to continue in a proper manner, if one side timeouts an otherwise mandatory step in a distributed Finite State Automaton scheme ( dFSA cannot take one-sided shortcuts, it is a dual-sided dFSA ).


Hypothesis:

If the server is not running, the client passes immediately through the first socket.poll call (since zmq just buffers the message internally). It blocks for 1 second on the second socket.poll call and correctly skips the recv_json block. It then hangs on the context.term() call.

Validation:

Let's review the code in a step-by-step manner

def  Test( SetImmediate = False ):
     ##################################################################################
     import zmq, json, time;                                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "import-s: DONE... VER: " ), zmq.zmq_version() )
     ##################################################################################
     ip      = 'localhost';                                                       print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "ip SET..." ) )
     port    =  5555;                                                             print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "port SET..." ) )
     addr    = "tcp://{0}:{1}".format( ip, port );                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "addr SET..." ) )
     message = { 'value': 10 };                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "message SET..." ) )
     ##################################################################################
     context = zmq.Context();                                                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context INSTANTIATED..." ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
     pass;              aReqSock = context.socket( zmq.REQ );                     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket INSTANTIATED..." ),                        "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################################################################################################################################################################
     pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     pass;              aReqSock.setsockopt(       zmq.LINGER,    0 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.LINGER    ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not let LINGER block on closing sockets with waiting msgs
     pass;         rc = aReqSock.getsockopt(       zmq.LINGER       );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.LINGER    ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     ##################################################################################################################################################################################################################################
     pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     if SetImmediate:
                        aReqSock.setsockopt(       zmq.IMMEDIATE, 1 );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.setsockopt( zmq.IMMEDIATE ) SET..." ),     "|", zmq.strerror( zmq.zmq_errno() ) ) # do not enqueue msgs for incoplete connections
     pass;         rc = aReqSock.getsockopt(       zmq.IMMEDIATE    );            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.getsockopt( zmq.IMMEDIATE ) GOT..." ), rc, "|", zmq.strerror( zmq.zmq_errno() ) ) #
     ##################################################################################################################################################################################################################################
     pass;              aReqSock.connect( addr );                                 print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.connect() DONE..." ),                      "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################
     pass;        rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLOUT ) SET..." ),   "|", zmq.strerror( zmq.zmq_errno() ) )
     if      0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLOUT ) != 0:# .poll() BLOCKS ~ 1s +NEVER gets a .POLLOUT for an empty TxQueue, does it?
         pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... ==  " ), rc )
         pass;          aReqSock.send_json( message,   flags = zmq.NOBLOCK )      # .send()-s dispatches message the REP-side may .recv() at some later time
         pass;                                                                    print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".send_json( zmq.NOBLOCK ): DONE..." ),            "|", zmq.strerror( zmq.zmq_errno() ) )
         pass;    rc  = aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  );     print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc = .poll( 1000 [ms], zmq.POLLIN ) SET..." ),    "|", zmq.strerror( zmq.zmq_errno() ) )
         if  0 != rc: # aReqSock.poll( timeout = 1000, flags = zmq.POLLIN  ) != 0:# .poll() BLOCKS < 1s = depends on REP-side response latency ( turn-around-time )
             pass;                                                                print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "rc was NON-ZERO... == " ), rc )
             response = aReqSock.recv_json()                                      # .recv() BLOCKS until ... if ever ...
             print( response );                                                   print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), ".recv_json() COMPLETED" ),                       "|", zmq.strerror( zmq.zmq_errno() ) )
     pass;                                                                        print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "if-ed code-block COMPLETED" ) )
     ##################################################################################
     rc = aReqSock.disconnect( addr );                                            print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.disconnect() RETURNED CODE ~ " ), rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     rc = aReqSock.close(      linger = 0 );                                      print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Socket.close() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     rc = context.term();                                                         print( "{0:_>21.10f}_ACK: {1:}".format( time.monotonic(), "Context.term() RETURNED CODE ~ " ),      rc,     "|", zmq.strerror( zmq.zmq_errno() ) )
     ##################################################################################

This produces something about this:

>>> Test( SetImmediate = False )
____947107.0356056700_ACK: import-s: DONE... VER:  4.2.5
____947107.0356727780_ACK: ip SET...
____947107.0356969039_ACK: port SET...
____947107.0357236000_ACK: addr SET...
____947107.0357460320_ACK: message SET...
____947107.0358552620_ACK: Context INSTANTIATED... | Success
____947107.0362445670_ACK: Socket INSTANTIATED... | Success
____947107.0363074190_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
____947107.0363573120_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
____947107.0364004780_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
____947107.0364456220_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0364890840_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947107.0365797410_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947107.0366972820_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947107.0367464600_ACK: rc was NON-ZERO... ==   2
____947107.0368948240_ACK: .send_json( zmq.NOBLOCK ): DONE... | Resource temporarily unavailable
____947108.0381633660_ACK: rc = .poll( 1000 [ms], zmq.POLLIN ) SET... | Resource temporarily unavailable
____947108.0382736750_ACK: if-ed code-block COMPLETED
____947108.0383544239_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
____947108.0384234400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
____947108.0386644470_ACK: Context.term() RETURNED CODE ~  None | Success

and

>>> Test( SetImmediate = True )
____947119.1267617550_ACK: import-s: DONE... VER:  4.2.5
____947119.1268189061_ACK: ip SET...
____947119.1268382660_ACK: port SET...
____947119.1268587380_ACK: addr SET...
____947119.1268772170_ACK: message SET...
____947119.1269678050_ACK: Context INSTANTIATED... | Success
____947119.1271884360_ACK: Socket INSTANTIATED... | Success
____947119.1272257260_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... -1 | Success
____947119.1272587100_ACK: Socket.setsockopt( zmq.LINGER    ) SET... | Invalid argument
____947119.1272875509_ACK: Socket.getsockopt( zmq.LINGER    ) GOT... 0 | Success
____947119.1273175071_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 0 | Success
____947119.1273461781_ACK: Socket.setsockopt( zmq.IMMEDIATE ) SET... | Invalid argument
____947119.1273732870_ACK: Socket.getsockopt( zmq.IMMEDIATE ) GOT... 1 | Success
____947119.1274376540_ACK: Socket.connect() DONE... | Resource temporarily unavailable
____947120.1287043930_ACK: rc = .poll( 1000 [ms], zmq.POLLOUT ) SET... | Resource temporarily unavailable
____947120.1287937190_ACK: if-ed code-block COMPLETED
____947120.1288697980_ACK: Socket.disconnect() RETURNED CODE ~  None | Resource temporarily unavailable
____947120.1289412400_ACK: Socket.close() RETURNED CODE ~  None | Invalid argument
____947120.1291404651_ACK: Context.term() RETURNED CODE ~  None | Success

Which proves the hypothesis not to be correct: there is no problem with the context.term(), but with a way, how the .connect( aTransportClass_Target ) against a target, which is not present is being internally handled.

To my surprise, in the version under test ( v4.2.5 ) the .poll( zmq.POLLOUT ) reports to have 2 items in the .POLLOUT-direction already present inside the user-reported TxQueue-state, without making a single explicit .send() ( as the .poll() was launched right after a .connect() ).

This seems to me to be some inconsistency with previous versions ( as if it would try to report a .connect()-associated "protocol/identity"-telemetry instead of reporting just the user-app-level messages ).

Whereas I might be wrong in trying to find out some rationale, why a principally empty queue would ever try to report a message being had already inside its .POLLOUT-direction, I hope to have sufficiently proved, the problem has nothing to do with the .LINGER == 0 / .term()-ination of the Context()-instance.

Q.E.D.

Question:

Assume I have a node (process, thread, whatever) with a ZeroMQ interface, let's say a REP socket. This means I have an infinite main loop, which sleeps in the zmq_recv or zmq_poll function.

Now that node should also receive data from another asynchronous event. Imagine for example a keyboard button-press event or toggling a GPIO pin or a timer expiration. The execution must also sleep while waiting for these events.

How should the main loop be written, such that it wakes up on either type of event, namely the reception of a message and the occurrence of the asynchronous event?

I can think of two solutions:

First, a polling loop, where both events are checked in a non-blocking way, and I run the loop every couple of milliseconds. This seems non-ideal in terms of processing load.

Second, I move the blocking code for both events into separate threads. In the main loop instead, I sleep on a semaphore (or condition variable), which is posted by the occurrence of either event. That's the way I would go in a traditional application, but the ZeroMQ Guide written by Pieter Hintjens is very explicit on not using semaphores:

Stay away from the classic concurrency mechanisms like as mutexes, critical sections, semaphores, etc. These are an anti-pattern in ZeroMQ applications.

So what to do? What's the best practice here?


Answer:

Depending on your OS, most events in the system are generated through some file descriptor becoming ready to read. This is generally how it works on Linuxs, Unixes, etc. For instance, keyboard input comes in through STDIN. And of course, file descriptors of any description can be included in a ZMQ poll.

Where an event isn't raised through a file descriptor that can be used in ZMQ poll (e.g. a serial port on Windows becoming ready to read), I generally use a thread to translate the event into a message sent through a ZMQ socket. Works well enough. Becomes non-portable, but that's unavoidable.

GPIO can be harder. If it's not backed by some driver that's got an ISR integrated into the OS's driver stack, then you're going to have to poll it.

Question:

Fairly new to ZeroMQ. I have a simple REQ/REP queue like below. I am using PHP but that doesn't matter as any language binding would be fine for me. This is client to request a task

$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
$req->send("Export Data as Zip");
echo $i . ":" . $req->recv().PHP_EOL;

And this is a worker to actually perform the task.

$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");
echo "Server is started at port $port" . PHP_EOL;
while(true)
{
    $msg = $srvr->recv();
    echo "Message = " . $msg . PHP_EOL;
    // Do the work here, takes 10 min, knows the count of lines added and remaining
    $srvr->send($msg . " is exported as zip file" . date('H:i:s'));
}

As the task of exporting data takes about 10 min, I want to connect to the server from a different client and get the progress/ percentage of the task done. I am wondering if that's even a valid approach.

I tried this approach where REQ/REP part works but I get nothing in PUB/SUB part

Server part

$ctx = new ZMQContext();
$srvr = new ZMQSocket($ctx, ZMQ::SOCKET_REP);
$srvr->bind("tcp://*:5454");

// add PUB socket to publish progress
$c = new ZMQContext();
$p = new ZMQSocket($c, ZMQ::SOCKET_PUB);
$p->bind("tcp://*:5460");

echo "Server is started at port 5454" . PHP_EOL;
$prog = 0;
while(true)
{
    $p->send($prog++ . '%'); // this part doesn't get to the progress client
    $msg = $srvr->recv();
    echo "Message = " . $msg . PHP_EOL;
    sleep(2);// some long task
    $srvr->send($msg . " Done zipping " . date('H:i:s'));
}

Progress client

$ctx = new ZMQContext();
$stat = new ZMQSocket($ctx, ZMQ::SOCKET_SUB);
$stat->connect('tcp://localhost:5460');
while (true){
    echo $stat->recv() . PHP_EOL; //nothing shows here
}

Request client

$ctx = new ZMQContext();
$req = new ZMQSocket($ctx, ZMQ::SOCKET_REQ);
$req->connect('tcp://localhost:5454');
for($i=0;$i<100;$i++){
    $req->send("$i : Zip the file please");
    echo $i . ":" . $req->recv().PHP_EOL; //works and get the output
}

Answer:

The concept is feasible, some tuning needed:

All PUB counterparties have to setup any non-default subscription, via, at least an empty subscription .setsockopt( ZMQ_SUBSCRIBE, "" ) meaning receive all TOPICs ( none "filter"-ed out ).

Next, both PUB-side and SUB sides ought get .setsockopt( ZMQ_CONFLATE, 1 ) configured, as there is of no value to populate and feed all intermediate values into the en-queue/de-queue pipeline, once the only value is in the "last", most recent message.

Always, the non-blocking mode of the ZeroMQ calls ought be preferred ( .recv( ..., flags = ZMQ_NOBLOCK ) et al ) or the Poller.poll() pre-tests ought be used to sniff first for a (non)-presence of a message, before spending more efforts on reading its context "from" ZeroMQ context-manager. Simply put, there are not many cases, where blocking-mode service calls may serve well in a production-grade system.

Also some further tweaking may help the PUB side, in case a more massive "attack" comes from the un-restricted pool of SUB-side entities and PUB has to create / manage / maintain resources for each of these ( unrestricted ) counterparties.

Question:

Consider the following setup. I connect() multiple DEALER sockets (clients) to another DEALER, bound to an address (server). Exactly one of the clients calls recv() in a loop, while the other clients may occasionaly send something to the server.

Will the server socket try to distribute sent messages to those clients that don't receive anything? I couldn't find how exactly dealer sockets deal with work distribution in the ZeroMQ Guide nor in the zmq_socket manpage.

I use ZMQ 4.1.


Answer:

Not all versions of ZeroMQ work the same way

The safe way is to check the respective version API man-page for details. Not all projects may use the latest API, due to interoperability reasons when any of the counterparties simply does not have any newer version API binding available. Then the oldest one rulez the crowd.

ZeroMQ 2.1.11 API does not permit DEALER/DEALER ZeroMQ 4.2.0 API does

In any case, API man-pages are always a worth source of details:

4.2.0ZMQ_DEALERA socket of type ZMQ_DEALER is an advanced pattern used for extending request/reply sockets. Each message sent is round-robined among all connected peers, and each message received is fair-queued from all connected peers.When a ZMQ_DEALER socket enters the mute state due to having reached the high water mark for all peers, or if there are no peers at all, then any zmq_send(3) operations on the socket shall block until the mute state ends or at least one peer becomes available for sending; messages are not discarded. When a ZMQ_DEALER socket is connected to a ZMQ_REP socket each message sent must consist of an empty message part, the delimiter, followed by one or more body parts.Summary of ZMQ_DEALER characteristics

Compatible peer sockets:      ZMQ_ROUTER, ZMQ_REP, ZMQ_DEALER
Direction:                    Bidirectional
Send/receive pattern:         Unrestricted
Outgoing routing strategy:    Round-robin
Incoming routing strategy:    Fair-queued
Action in mute state:         Block

Question:

Is it possible to have a publisher and subscriber in one thread? Whenever I do

socket_send = context.socket(zmq.PUB)
socket_send.bind("tcp://127.0.0.1:5559")

socket_recv = context.socket(zmq.SUB)
socket_recv.connect("tcp://127.0.0.1:5559")
socket_recv.setsockopt(zmq.SUBSCRIBE, "id1")

The subscription does not work (i.e. message don't arrive). When I used socket_recv.bind() the sending does not work (using bind() on socket_send and socket_recv causes an address-already-used error).

Any idea of how I can resolve this? I have a multiple clients writing messages to the pub-sub message bus, then I used the ventilator example to distribute the messages to workers and these workers write back to the ventilator which sends the results back to the clients (worker - ventilator communication is a different communication). Perhaps there's a better communication pattern to handle this...


Answer:

You almost always want to run a ready-to-go example of the pattern you want to use first just to confirm everything seems to be in working order. Unfortunately I don't see any ready made examples in pyzmq (which is, I assume, the binding you're using) with pub/sub both in the same thread, but I have seen and run such examples in other languages so it's not a limitation of ZMQ and should be possible in your situation.

There are a couple of things you'll want to look at. Your code sample is very sparse, there's no way for anyone to diagnose what's going on from that, but here's some suggestions:

  • Before trying to subscribe to a specific topic (in your case, "id1"), try subscribing to everything: socket_recv.setsockopt(zmq.SUBSCRIBE, "") - this will remove the possibility that you're not setting up the subscription properly.
  • Along the same lines, when you do subscribe to "id1", make sure your message is either a single frame message that begins with the string "id1", or it's a multi-frame message with "id1" as the first frame.
  • I assume all of this is being run in a synchronous context, which means your subscriber should finish connecting before you move on to the next line, but just make sure that's true... if you should start publishing your message before your subscriber is finished connecting, that message will be lost.

As you note, you can't bind() twice on the same address, something useful to keep in mind. You want to think of one side of the socket pair as a "server" (which really means the constant element) and the other side as a "client" (which really means the unreliable element)... if they both are just as constant and both as reliable, pick the one that "owns" or "originates" the data (in pub/sub, this would always be the publisher) and mark that one the "server"... you want to bind() on your server, and connect() on your client.

All that said... as sberry noted, your proposed use case is bi-directional communication, which doesn't seem to fit pub/sub. There are many examples of doing what you want to do in the guide, specifically look at reliable request/reply patterns. Your use case is similar enough that you'll probably want to use one of those as a base, and there is python code linked throughout the descriptions of those patterns that will help you understand which code is doing what.