Hot questions for Using ZeroMQ in network programming

Top 10 C/C++ Open Source / ZeroMQ / network programming

Question:

I'm trying to implement a ZeroMQ pattern on the TCP layer between a c# application and distributed python servers. I've gotten a version working with the request-reply REQ/REP pattern and it seems relatively stable when testing on localhost. However, in testing, I've debugged a few situations, where I accidently send multiple requests before receiving a reply which apparently is not acceptable.

In practice the network will likely have lots of dropped packets and I suspect that I'll be dropping lots of replies and/or unable to send requests.

1) Is there a way to reset the connection between REQ/REP request-reply sockets?Would a REOUTER/DEALER pattern instead make more sense? As this is my first application with ZeroMQ, I was hoping to keep it simple.

2) Is there a good ZeroMQ mechanism for handling the connectivity events? I've been reading "the guide" and there are a few mentions of monitoring connections, but no examples. I found the ZMonitor, but can't get the events to trigger in c#.


Answer:

Ad 1) No, there is not any socket link-management interface exposed to user to test/reset the state of the FSA-to-FSA link in ZeroMQ framework.

Yes, XREQ/XREP may help you overcome the deadlocks, that may & do happen in REQ/REP Scaleable Formal Communication Pattern:

Ref.: REQ/REP Deadlocks >>> https://stackoverflow.com/a/38163015/3666197

Fig.1: Why it is wrong to use a naive REQ/REPall cases when [1]in_WaitToRecvSTATE_W2R + [2]in_WaitToRecvSTATE_W2Rare principally unsalvageable mutual deadlock of REQ-FSA/REP-FSA Finite-State-Automata and will never reach the "next" in_WaitToSendSTATE_W2S internal state.

               XTRN_RISK_OF_FSA_DEADLOCKED ~ {  NETWORK_LoS
                                         :   || NETWORK_LoM
                                         :   || SIG_KILL( App2 )
                                         :   || ...
                                         :      }
                                         :
[App1]      ![ZeroMQ]                    :    [ZeroMQ]              ![App2] 
code-control! code-control               :    [code-control         ! code-control
+===========!=======================+    :    +=====================!===========+
|           ! ZMQ                   |    :    |              ZMQ    !           |
|           ! REQ-FSA               |    :    |              REP-FSA!           |
|           !+------+BUF> .connect()|    v    |.bind()  +BUF>------+!           |
|           !|W2S   |___|>tcp:>---------[*]-----(tcp:)--|___|W2R   |!           |
|     .send()>-o--->|___|           |         |         |___|-o---->.recv()     |
| ___/      !| ^  | |___|           |         |         |___| ^  | |!      \___ |
| REQ       !| |  v |___|           |         |         |___| |  v |!       REP |
| \___.recv()<----o-|___|           |         |         |___|<---o-<.send()___/ |
|           !|   W2R|___|           |         |         |___|   W2S|!           |
|           !+------<BUF+           |         |         <BUF+------+!           |
|           !                       |         |                     !           |
|           ! ZMQ                   |         |   ZMQ               !           |
|           ! REQ-FSA               |         |   REP-FSA           !           |
~~~~~~~~~~~~~ DEADLOCKED in W2R ~~~~~~~~ * ~~~~~~ DEADLOCKED in W2R ~~~~~~~~~~~~~
|           ! /\/\/\/\/\/\/\/\/\/\/\|         |/\/\/\/\/\/\/\/\/\/\/!           |
|           ! \/\/\/\/\/\/\/\/\/\/\/|         |\/\/\/\/\/\/\/\/\/\/\!           |
+===========!=======================+         +=====================!===========+

Fig.2: One may implement a free-stepping transmission layer using several pure ZeroMQ builtins and add some SIG-layer tools for getting a full control of all possible distributed system states.

App1.PULL.recv( ZMQ.NOBLOCK ) and App1.PULL.poll( 0 ) are obvious

[App1]      ![ZeroMQ]
code-control! code-control           
+===========!=======================+
|           !                       |
|           !+----------+           |         
|     .poll()|   W2R ___|.bind()    |         
| ____.recv()<----o-|___|-(tcp:)--------O     
| PULL      !|      |___|           |   :   
|           !|      |___|           |   :   
|           !|      |___|           |   :   
|           !+------<BUF+           |   :     
|           !                       |   :                           ![App2]
|           !                       |   :     [ZeroMQ]              ! code-control
|           !                       |   :     [code-control         ! once gets started ...
|           !                       |   :     +=====================!===========+
|           !                       |   :     |                     !           |
|           !                       |   :     |         +----------+!           |
|           !                       |   :     |         |___       |!           |
|           !                       |   :     |         |___| <--o-<.send()____ |
|           !                       |   :<<-------<tcp:<|___|   W2S|!      PUSH |
|           !                       |   :    .connect() <BUF+------+!           |
|           !                       |   :     |                     !           |
|           !                       |   :     |                     !           |
+===========!=======================+   :     +=====================!===========+

Ad 2) No, but one may create one's own "ZeroMQ-consumables" to test the distributed system's ability to setup a new transport/signalling socket, being ready to dispose it, if the RTO-test fails to prove that both ( multiple ) sides are ready to setup + communicate over the ZeroMQ infrastructure ( notice, that the problems are not only with the ZeroMQ layer, but also the App-side need not be ready/in such a state to handle the expected communication interactions ( and may cause soft-locks / dead-locks ).


The best next step?

What I can do for your further questions right now is to direct you to see a bigger picture on this subject >>> with more arguments, a simple signalling-plane / messaging-plane illustration and a direct link to a must-read book from Pieter HINTJENS.

Question:

I'm having issues understanding what socket types are negatively impacted in the event that TCP must try retransmmitting messages.

We have a distributed system that uses a combination of inprocess and TCP connections for internal processes and external devices and applications. My concern is that in the event there is a significant traffic that causes latency and dropped packets, that a TCP retransmit will cause delay in the system.

What I'd like to avoid is an application that has messages compile in a queue waiting to be sent (via a single ZeroMQ TCP socket) because TCP is forcing the socket to repeatedly retransmit messages that never sent an acknowledge.

Is this an issue that can happen using ZeroMQ? Currently I am using PUSH/PULL on a Linux OS.

Or is this not a concern, and if not, why?

It is crucial that messages from the external devices/applications do not feed stale data.


Answer:

First, the only transport where retransmits are possible is TCP over an actual physical network. And then likely not on a LAN, as it's unlikely that Ethernet packets will go missing on a LAN.

TCP internal to a computer, and especially IPC, INPROC, etc will all have guaranteed delivery of data first time, every time. There is no retransmit mechanism.

If one of the transports being used by socket does experience delays due to transmission errors, that will slow things up. ZMQ cannot consider a message "sent" until it's been propagated via all the transports used by the socket. The external visibility of "sent" is that the outbound message queue has moved away from the high water mark by 1.

It's possible that any one single message will arrive sooner over IPC than TCP, and possible that message 2 will arrive over IPC before message 1 has arrived via TCP. But if you're relying on message timing / relative order, you shouldn't be using ZMQ in the first place; it's Actor model, not CSP.

EDIT For Frank

The difference between Actor and CSP is that the former is asynchronous, the latter is synchronous. Thus for Actor model, the sender has zero information as to when the receiver actually gets a message. For CSP, the sending / receiving is an execution rendevous - the send completes only when the receive is complete.

This can be remarkably useful. If in your system it makes no sense for A to instruct C to do something before (in time, not just in A's code flow) instructing B, then you can do that with CSP (but not Actor model). That's because when A sends to B, B receives the message before A's send completes, freeing A to then send to C.

Unsurprisingly it's real time systems that benefit from CSP.

So consider ZMQ's Actor model with a mix of TCP, IPC and INPROC transports in ZMQ. There's a good chance that messages send via TCP will arrive a good deal later than messages sent through INPROC, even if they were sent first.

Question:

I try to run simple ZMQ application ( ROUTER/DEALER ).

I just send a request from DEALER to ROUTER, send it back. But DEALER cannot receive it.

I run it in one process ( ROUTER has its own thread ).

#include <zmq.hpp>
#include <string>
#include <iostream>
#include <thread>

void router()
{
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_ROUTER);
    socket.bind("tcp://*:5561");

    while(1)
    {
        //  Wait for next request from client
        zmq::message_t reply;
        socket.recv (&reply);

        std::cout << "Router: Received request" << std::endl;

        //  Send reply back to client
        std::string string= "example";
        zmq::message_t message(string.size());
        memcpy (message.data(), string.data(), string.size());

        std::cout << "Router: Sending" << std::endl;
        socket.send (message);
    }
}

int main ()
{
    std::thread t{&router};

    //  Prepare our context and socket
    zmq::context_t context (2);
    zmq::socket_t socket (context, ZMQ_DEALER);

    std::cout << "Dealer: Connecting to hello world server…" << std::endl;
    socket.connect ("tcp://127.0.0.1:5561");

    for (int i = 0; i != 10; i++)
    {
        zmq::message_t request (5);
        memcpy (request.data (), "Hello", 5);
        std::cout << "Dealer: Sending Hello " << i << "…" << std::endl;
        socket.send (request);

        zmq::message_t reply;
        socket.recv (&reply);
        std::cout << "Dealer: Received " << i << std::endl;
    }
    return 0;
}

I have an output:

Dealer: Connecting to hello world server…
Dealer: Sending Hello 0…
Router: Received request
Router: Sending
Router: Received request
Router: Sending

Answer:

From ZMQ's documentation on socket :

When receiving messages a ZMQ_ROUTER socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers. When sending messages a ZMQ_ROUTER socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to.

So modify your code to something like this :

#include <zmq.hpp>
#include <string>
#include <iostream>
#include <thread>
#include <unistd.h>
void router()
{
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_ROUTER);
    socket.bind("tcp://*:5561");

    while(1) // Fix that infinite loop or your thread won't join
    {
        //  Wait for next request from client
        zmq::message_t id;
        socket.recv (&id);

        zmq::message_t reply;
        socket.recv (&reply);
        std::cout << std::string(static_cast<char*>(reply.data()),reply.size()) << std::endl;
        std::cout << "Router: Received request" << std::endl;

        //  Send reply back to client
        zmq::message_t copy_id;
        copy_id.copy(&id);
        std::string string= "example";
        zmq::message_t message(string.size());
        memcpy (message.data(), string.data(), string.size());
        std::cout << "Router: Sending" << std::endl;
        socket.send(id, ZMQ_SNDMORE);
        socket.send(message);
    }
    sleep(1);
    socket.setsockopt(ZMQ_LINGER, 0);
    socket.close();
    context.close();
}

int main ()
{
    std::thread t{&router};

    //  Prepare our context and socket
    zmq::context_t context (2);
    zmq::socket_t socket (context, ZMQ_DEALER);

    std::cout << "Dealer: Connecting to hello world server…" << std::endl;
    socket.connect ("tcp://127.0.0.1:5561");

    for (int i = 0; i != 10; i++)
    {
        zmq::message_t request (5);
        memcpy (request.data (), "Hello", 5);
        std::cout << "Dealer: Sending Hello " << i << "…" << std::endl;
        socket.send(request);

        zmq::message_t reply;
        socket.recv(&reply);
        std::cout << "Dealer: Received " << i << std::endl;
    }
    socket.setsockopt(ZMQ_LINGER, 0);
    socket.close();
    context.close();
    t.join();
    return 0;
}

Question:

I'm trying to send a file over a wireless network using ZeroMQ and the NORM protocol. I'm currently using the PUB/SUB pattern as that's the only pattern supported by NORM with ZeroMQ as far as I can tell.

I've got it set up so that small messages are passed along just fine, but occasionally the receiver wont pick up a message. From that point messages are just dropped. This can occasionally be remedied by restarting either the Publisher or the Subscriber, but not every time. I've tried adjusting the about of bits sent and the time between each call to send to no avail. It looks like I can receive about 20-60 multicast messages before the connection becomes unstable. If i use the same code but set it up with TCP the connection is much more reliable, on the order of thousands of messages before an error occurs.

I've tried implementing a wrapper class to restart the subscribers after a certain period of inactivity - that didn't work. Neither does setting socket.recv(zmq.NOBLOCK) inside a while loop.

I'm aware of the Pub-Sub Synchronization pattern as described here, http://zguide.zeromq.org/page:all#Node-Coordination, but NORM, as implemented in the ZeroMQ's norm_engine.cpp (https://github.com/zeromq/libzmq/blob/master/src/norm_engine.cpp) doesn't look like it's set up to allow this pattern.

Is there a way to re-send lost packets, or ensure a healthy multicast connection?

Code is Python.

Publisher:

import zmq
import time
import os
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("norm://224.0.0.1:3000")
i = 1

imgfile_path = "/home/adam/programs/zmq/tux.svg.png"
imgsize = os.stat(imgfile_path).st_size
print "attempting to send", imgsize, "bytes"

sleep_time = 1
topic = ""
packet_size = 500
left = packet_size
f = open(imgfile_path, 'rb')
fi = f.read(packet_size)
while (imgsize - left) > packet_size:
    print "sent packet number:", i
    print "size: ", len(topic + str(i)[-1] + fi)
    i += 1
    socket.send(topic + str(i)[-1] + fi)

    fi = f.read(packet_size)
    left += packet_size
    time.sleep(sleep_time)
print imgsize, left
time.sleep(sleep_time)
fi = f.read(imgsize - left)
print fi
socket.send(topic + " " + fi)
f.close()

Subscriber:

import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind("norm://224.0.0.2:3000")
socket.setsockopt(zmq.SUBSCRIBE, "")

imgdir = "/home/adam/programs/zmq/img/"
filename = "tux.svg.png"
destfile = imgdir + filename
packet_size = 501
print "attempting to receive"
f = open(destfile, 'wb')
while True:
    msg = None
    while msg is None:
        try:
            msg = socket.recv(zmq.NOBLOCK)
        except:
            pass
    if msg: 
        print "msg = ", msg[0]
        print "we got something", len(msg)
        f.write(msg[1:])
        if len(msg) < packet_size:
            break
f.close()
print "exiting..."

Furthermore, once I can ensure that I can send a file, I'd like to adjust the Forward Error Correction and NACK rate, which is why NORM is so useful to me. Is there a way to do this without re-writing norm_engine.cpp?

Thanks!


Answer:

Some of the things you're assuming are confirmed on the doc page here. Namely that, for the time being, it's only PUB/SUB, though you do have the ability to do the syncing you linked by using NORM for PUB/SUB and TCP for REQ/REP.

Otherwise, I believe you're suffering from this still being fairly early in the NORM implementation phase, as (also from my link) it talks about all the things that aren't yet done. That was from a year ago, but I don't see much out there talking about it since then.

Depending on the specifics of your infrastructure, multicast may not even be the best choice, as discussed here, none of the multicast protocols are really designed for today's high performance network speeds. It could be that your implementation is falling victim to poor recovery from issues that come up as a natural consequence of these transport protocols.

EDIT:

Per the github link in your post, the code hasn't been updated since 3/19/14, the same date the page I linked above was published, so anything it says about the NORM transport in ZMQ should be completely up-to-date.

At the bottom of the page, in the points about what the ZMQ NORM implementation can and can't do:

  1. The current "norm_engine" assumes a fairly specific form of NORM transport options. It may be desirable to expose additional NORM features and transport modalities via the ZeroMQ API. Some examples include:
    • NORM's ability to alternatively provide UDP-like "best effort" and "better than best effort" (using packet erasure coding) delivery services for applications. This would include control of the NORM FEC encoding parameters.

So, it looks like it doesn't (in its current state) give you any control over the Forward Error Correction, just using it in its default reactive (high performance) state. This should provide reliability for your connection by default, but if ZMQs implementation hasn't been thoroughly put through its paces it may be more fragile than you'd like. With the lack of content and patches out there on it, I'd assume that it's just not ready for you yet.

Question:

I'm looking into a small network system prototype, that at it's lowest level, has one software "parent" process that communicates back and forth with 5 software "children" processes.

I am using ZeroMQ to communicate between processes.

My question is a question of multi-threaded handling vs singled-threaded handling.

In this kind of system would a single thread in the parent that handles sending, receiving, and processing messages to and from the children be more efficient then 5 threads (1 thread per process)?

For singled-threaded, I'm concerned that while the parent works to process one message, the messages will start piling up.

For multi-threaded, I'm concerned of context switching and performance hits if this system architecture is expanded. Think 50 parents at 5 threads a piece, so 250 threads minimum.

The threads are written to ZeroMQ standards without locks, critical sections, shared memory, etc.

I use Linux and C++.


Answer:

You can run a message queue on the parent, which should also allow it to not get overwhelmed by the children while processing the events in the order they arrive. Furthermore, you can expand this to a simple send-acknowledge model, where children will wait for their message to be acknowledged, before sending further messages, so you can allow the parent to control the rate at which it receives messages.

Regarding the number of threads to run, I agree with you, as you scale up the complexity of the parent program will increase.

The main factor, I believe, will be whether the threads need to share any data, or communicate, with each other. If that is not the case, the problem is simplified as you can use 1 thread per 1 or more children and just grab messages from the queue.

Question:

Multiple Access Points are in a network. They need to sync client data whenever a client connects to any one of the AP.

I've studied ZeroMQ documentation for REQ/REP, PUB/SUB and PUSH/PULL patterns. Not quite sure which pattern really works for the above requirement between PUSH/PULL and PUB/SUB.

In the documentation, it is mentioned that,

One socket may have many outgoing and many incoming connections.

How to achieve this using one of the above patterns so that the data among APs always in sync.

Points to be considered

  • APs join network arbitrarily.
  • If a client connects to an AP, it's data need to be stored in all APs.
  • ZeroMQs are for C language.

Answer:

You could look at the Harmony peer/peer pattern; it's discussed in the ZMQ guide. Access points could then send a client state message to all their peer access points every time a client connects. No single access point would be a "master" - they would be true peers.

Alternatively you could have one of the access points setting itself up as a "master" (a bound PULL socket), to which slave access points connect and send client data through a PUSH socket. The "master" would then send that data back out to the other access points on a PUB socket, with slave access points receiving that on a SUB socket. It's not a very elegant solution, the master has to handle the entire network's client data traffic, but it may be easier to use.