Hot questions for Using ZeroMQ in low latency

Question:

My company is looking into using ZeroMQ as the transport mechanism. First I benchmarked the performance just to get a hunch of what I´m playing with.

So I created an application comparing zmq dealer-to-dealer setup against winsock. I meassured the round-time-trip of sending synchronous messages from a client to a server and then calculating the average.

Here server running winsock:

DWORD RunServerWINSOCKTest(DWORD dwPort)
{
    WSADATA wsaData;
    int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iRet != NO_ERROR)
    {
        printf("WSAStartup failed with error: %d\n", iRet);
        return iRet;
    }

    struct addrinfo hints;
    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    struct addrinfo *result = NULL;
    iRet = getaddrinfo(NULL, std::to_string(dwPort).c_str(), &hints, &result);
    if (iRet != 0)
    {
        WSACleanup();
        return iRet;
    }

    SOCKET ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
    if (ListenSocket == INVALID_SOCKET)
    {
        freeaddrinfo(result);
        WSACleanup();
        return WSAGetLastError();
    }

    iRet = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
    if (iRet == SOCKET_ERROR)
    {
        freeaddrinfo(result);
        closesocket(ListenSocket);
        WSACleanup();
        return WSAGetLastError();
    }

    freeaddrinfo(result);
    iRet = listen(ListenSocket, SOMAXCONN);
    if (iRet == SOCKET_ERROR)
    {
        closesocket(ListenSocket);
        WSACleanup();
        return WSAGetLastError();
    }

    while (true)
    {
        SOCKET ClientSocket = accept(ListenSocket, NULL, NULL);
        if (ClientSocket == INVALID_SOCKET)
        {
            closesocket(ListenSocket);
            WSACleanup();
            return WSAGetLastError();
        }
        char value = 0;
        setsockopt(ClientSocket, IPPROTO_TCP, TCP_NODELAY, &value, sizeof(value));

        char recvbuf[DEFAULT_BUFLEN];
        int recvbuflen = DEFAULT_BUFLEN;
        do {

            iRet = recv(ClientSocket, recvbuf, recvbuflen, 0);
            if (iRet > 0) {
            // Echo the buffer back to the sender
                int iSendResult = send(ClientSocket, recvbuf, iRet, 0);
                if (iSendResult == SOCKET_ERROR)
                {
                    closesocket(ClientSocket);
                    WSACleanup();
                    return WSAGetLastError();
                }
            }
            else if (iRet == 0)
                printf("Connection closing...\n");
            else  {
                closesocket(ClientSocket);
                WSACleanup();
                return 1;
            }

        } while (iRet > 0);

        iRet = shutdown(ClientSocket, SD_SEND);
        if (iRet == SOCKET_ERROR)
        {
            closesocket(ClientSocket);
            WSACleanup();
            return WSAGetLastError();
        }
        closesocket(ClientSocket);
    }
    closesocket(ListenSocket);

    return WSACleanup();
}

Here is the client running winsock:

DWORD RunClientWINSOCKTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
    WSADATA wsaData;
    int iRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iRet != NO_ERROR)
    {
        return iRet;
    }

    SOCKET ConnectSocket = INVALID_SOCKET;
    struct addrinfo *result = NULL,  *ptr = NULL, hints;


    ZeroMemory(&hints, sizeof(hints));
    hints.ai_family = AF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;

    int iResult = getaddrinfo(strAddress.c_str(), std::to_string(dwPort).c_str(), &hints, &result);
    if (iResult != 0) {
        WSACleanup();
        return 1;
    }

    for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {
        ConnectSocket = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol);
        if (ConnectSocket == INVALID_SOCKET) {
            WSACleanup();
            return 1;
        }

        iResult = connect(ConnectSocket, ptr->ai_addr, (int)ptr->ai_addrlen);
        if (iResult == SOCKET_ERROR) {
            closesocket(ConnectSocket);
            ConnectSocket = INVALID_SOCKET;
            continue;
        }
        break;
    }

    freeaddrinfo(result);

    if (ConnectSocket == INVALID_SOCKET) {
        WSACleanup();
        return 1;
    }


    // Statistics
    UINT64 uint64BytesTransmitted = 0;
    UINT64 uint64StartTime = s_TimeStampGenerator.GetHighResolutionTimeStamp();
    UINT64 uint64WaitForResponse = 0;

    DWORD dwMessageCount = 1000000;

    CHAR cRecvMsg[DEFAULT_BUFLEN];
    SecureZeroMemory(&cRecvMsg, DEFAULT_BUFLEN);

    std::string strSendMsg(dwMessageSize, 'X');

    for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
    {
        int iRet = send(ConnectSocket, strSendMsg.data(), strSendMsg.size(), 0);
        if (iRet == SOCKET_ERROR) {
            closesocket(ConnectSocket);
            WSACleanup();
            return 1;
        }
        uint64BytesTransmitted += strSendMsg.size();

        UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
        iRet = recv(ConnectSocket, cRecvMsg, DEFAULT_BUFLEN, 0);
        if (iRet < 1)
        {
            closesocket(ConnectSocket);
            WSACleanup();
            return 1;
        }
        std::string strMessage(cRecvMsg);

        if (strMessage.compare(strSendMsg) == 0)
        {
            uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
        }
        else
        {
            return NO_ERROR;
        }
}

    UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
    PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);

    iResult = shutdown(ConnectSocket, SD_SEND);
    if (iResult == SOCKET_ERROR) {
        closesocket(ConnectSocket);
        WSACleanup();
        return 1;
    }
    closesocket(ConnectSocket);
    return WSACleanup();
}

Here is the server running ZMQ (dealer)

DWORD RunServerZMQTest(DWORD dwPort)
{
    try
    {
        zmq::context_t context(1);
        zmq::socket_t server(context, ZMQ_DEALER);

        // Set options here
        std::string strIdentity = s_set_id(server);
        printf("Created server connection with ID: %s\n", strIdentity.c_str());

        std::string strConnect = "tcp://*:" + std::to_string(dwPort);
        server.bind(strConnect.c_str());

        bool bRunning = true;
        while (bRunning)
        {
            std::string strMessage = s_recv(server);

            if (!s_send(server, strMessage))
            {
                return NO_ERROR;
            }
        }
    }
    catch (zmq::error_t& e)
    {
        return (DWORD)e.num();
    }

return NO_ERROR;

}

Here is the client running ZMQ (dealer)

DWORD RunClientZMQTest(std::string strAddress, DWORD dwPort, DWORD dwMessageSize)
{
    try
    {
        zmq::context_t ctx(1);
        zmq::socket_t client(ctx, ZMQ_DEALER); // ZMQ_REQ

        // Set options here
        std::string strIdentity = s_set_id(client);

        std::string strConnect = "tcp://" + strAddress + ":" + std::to_string(dwPort);
        client.connect(strConnect.c_str());

        if(s_send(client, "INIT"))
        {
            std::string strMessage = s_recv(client);
            if (strMessage.compare("INIT") == 0)
            {
                printf("Client[%s] connected to: %s\n", strIdentity.c_str(), strConnect.c_str());
            }
            else
            {
                return NO_ERROR;
            }
        }
        else
        {
            return NO_ERROR;
        }


        // Statistics
        UINT64 uint64BytesTransmitted   = 0;
        UINT64 uint64StartTime          = s_TimeStampGenerator.GetHighResolutionTimeStamp();
        UINT64 uint64WaitForResponse    = 0;

        DWORD dwMessageCount = 10000000;


        std::string strSendMsg(dwMessageSize, 'X');
        for (DWORD dwI = 0; dwI < dwMessageCount; dwI++)
        {
            if (s_send(client, strSendMsg))
            {
                uint64BytesTransmitted += strSendMsg.size();

                UINT64 uint64BeforeRespone = s_TimeStampGenerator.GetHighResolutionTimeStamp();
                std::string strRecvMsg = s_recv(client);
                if (strRecvMsg.compare(strSendMsg) == 0)
                {
                    uint64WaitForResponse += (s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64BeforeRespone);
                }
                else
                {
                    return NO_ERROR;
                }
            }
            else
            {
                return NO_ERROR;
            }
        }
        UINT64 uint64ElapsedTime = s_TimeStampGenerator.GetHighResolutionTimeStamp() - uint64StartTime;
        PrintResult(uint64ElapsedTime, uint64WaitForResponse, dwMessageCount, uint64BytesTransmitted, dwMessageSize);
    }
    catch (zmq::error_t& e)
    {
        return (DWORD)e.num();
    }

    return NO_ERROR;
    }

Im running the benchmark locally with message size of 5 bytes and I get the following result:

WINSOCK

Messages sent:                 1 000 000
Time elapsed (us):            48 019 415
Time elapsed (s):                     48.019 415
Message size (bytes):                  5
Msg/s:                            20 825
Bytes/s:                         104 125
Mb/s:                                  0.099
Total   response time (us):   24 537 376
Average repsonse time (us):           24.0

and

ZeroMQ

Messages sent:                 1 000 000
Time elapsed (us):           158 290 708
Time elapsed (s):                    158.290 708    
Message size (bytes):                  5
Msg/s:                             6 317
Bytes/s:                          31 587
Mb/s:                                  0.030
Total   response time (us):  125 524 178    
Average response time (us):          125.0

Can anyone explain why the average response time is so much higher when using ZMQ?

The goal is to find a setup where I can send and receive messages asynchronously without the need to reply. If this can be achieved with a different setup than dealer-dealer, please let me know!


Answer:

This is only sort of an answer to a little part of your question, but here goes -

Why do you need dealer/dealer? I assume because communication can initiate from either point? You're not tied to dealer/dealer, in particular it limits you to only two endpoints, if you ever add another endpoint on either side of the communication, say, a second client, then each client will only receive half the messages because dealer is strictly round-robin.

What you need for asynchronous communication is some combination of dealer and or router sockets. Neither requires a response, the main differences are in how they choose which connected peer to send a message to:

  • Dealer, as said, is strictly round robin, it will send to each connected peer in series
  • Router is strictly an addressed message, you have to know the "name" of the peer you want to send to to get the message there.

These two socket types work together because dealer sockets (and request sockets, dealer is a "request-type" socket) send their "name" as part of the message, which the router socket can use to send data back. This is a request/reply paradigm, and you'll see that sort of paradigm enforced in all of the examples in the guide, but you can bend that paradigm to what you're looking for, in particular neither dealer nor router require a reply.

Without knowing your full requirements I can't tell you what sort of ZMQ architecture I would choose, but in general I prefer the expandability of router sockets, it's easier to handle appropriate addressing than it is to shoehorn everything into a single peer... you'll see warnings against doing router/router, and I agree with them to the extent that you should understand what you're doing before you try it, but understanding what you're doing, the implementation isn't that hard.


You also have the option, if it fits your requirements, to set up each end with a pub socket, and each with a sub socket, if there are literally no replies ever. If it's strictly a data feed from source to target, and neither peer needs any feedback on what it sends, then this is probably the best choice, even though it means you're dealing with two sockets per end rather than one.


None of this addresses performance directly, but the important thing to understand is that zmq sockets are optimized for particular use cases, and as pointed out in John Jefferies' answer, you're breaking that use case for your dealer socket by making the messaging in your test strictly synchronous. The first place to start is to finalize your ZMQ architecture, and then simulate an actual message flow, in particular not adding in arbitrary waits and synchronicity, which will necessarily change the way throughput looks as you're testing it, pretty much by definition.

Question:

I use private field (PushSocket - zmq_push socket for netmq)

private PushSocket _pushSocket;

And two methods which use this socket in different threads

public void Method1()
{
    //.....//
    _pushSocket.SendFrame(....);
    //.....//
}

public void Method2()
{
    //.....//
    _pushSocket.SendFrame(....);
    //.....//
}

Should I use lock or another synchronization primitives?


Answer:

No,

on a basis of understanding the ZeroMQ professional-level recommendation No.1:one shall not design code with sharing sockets among threads.

By design, ZeroMQ Scalable Formal Communication Patterns ( a.k.a. a bit misleadingly nicknamed as socket(s) )are not thread-safe ( and never tried to be ).

It is not a thing of belief in one's capabilities to somehow mediate inter-thread signalling, it is a principal belief, that good scalable parallel code shall never share, nor block.

Thus said ZeroMQ evangelism.

Confused? np. Angry? np.Zero-sharing, Zero-locking -- try to consider it as some form of collision avoidance, rather than having to sieve ashes from burnt thrashes of an uncontrolled concurrent havoc.


If in doubt

one has the best option, to read Pieter HINTJENS' book "Code Connected. Volume 1" and spend some time with Pieters views on scalable code design principles.

You will soon fall in love with the new style of thinking ZeroMQ-way.

Question:

I get this strange deadlock when I try to synchronize two python3 scripts using 0mq (ZeroMQ). The scripts run fine for several thousand iterations, but sooner or later they both stop and wait for each other. I am running both scripts from different CMD-Windows on Windows 7.

I cannot figure out why such a deadlock is even possible. What can go wrong here?

Script A:

while (1):
   context = zmq.Context()
   socket = context.socket(zmq.REP)
   socket.bind('tcp://127.0.0.1:10001')
   msg = socket.recv()                        # Waiting for script B to send done
   # ............................................................................
   # ... do something useful (takes only a few millisecs)
   # ............................................................................     
   context = zmq.Context()
   socket = context.socket(zmq.REQ)
   socket.connect('tcp://127.0.0.1:10002')
   socket.send_string("done")                 # Tell script B we are done

Script B

while (1):
   # ............................................................................
   # ... do something useful (takes only a few millisecs)
   # ............................................................................
   context = zmq.Context()
   socket = context.socket(zmq.REQ)
   socket.connect('tcp://127.0.0.1:10001')
   socket.send_string("done")               # Tell script A we are done

   context = zmq.Context()
   socket = context.socket(zmq.REP)
   socket.bind('tcp://127.0.0.1:10002')
   msg = socket.recv()                      # Waiting for script A to send done

Answer:

This is not a DeadLock case

The code, sure, still needs some care.

Disambiguation: your scenario does not hit into a resources mutual locking state, aka a DeadLock. Yes, sure, your code crashes, but most probably not due to a REQ/REP DeadLock ( where it might and does appear on a lossy network tcp: transport-class ). The posted code is crashing due to unmanaged resource handling, not due to reaching a mutual-blocking state of a DeadLock / LiveLock.


How to fix it?

First, let's assume your ultra-low latency-motivated system does not allow to repetitively instantiate anything. There are exceptions to this, but let's be profi.

  1. move your .Context() resource setup ( or inheritance from an outer call ) out of the loop

  2. review, whether you need and your latency constraints allow you to setup / tear-down a .socket() resource twice in each loop-run.

  3. decide, whether you can live with real REQ/REP deadlock once a first message gets lost in the transport-path

  4. enforce graceful resources-use termination ( .socket()-s, O/S port#s, .Context()-s ). Do not let them hanging unterminated forever, while creating infinite amount of others instead, that devastates any "fault-resilient" system. Resources are never infinite.

  5. design both signalling and transmission behaviours in a non-blocking manner. This allows you to detect and handle remote-process timeouts and introduce a chance for local remedy / responsive actions.

  6. redesign the code to a level of secure code you need ( the below example works a few years in a soft-realtime controlled endless loop 24/7/365 in a distributed processing framework with a remote keyboard and some other local- and remote-diagnostic tools ).


What is missing for production-grade code?

Your code has to "envisage" what might have gone wrong, in any part of your distributed system. Yes, it is hard, but necessary. Your remote node -- a communicating counterparty -- stopped responding, lost a message, went rebooted, stalled due to O/S crash, whatever imaginable ( plus a few rather nasty surprised you will find only on-the-fly ... ). This is another Pandora's Box to cover in this small post, which does not mean it is not necessary. It is your life-saving vest.

Design in a non-blocking manner wherever you can, this way you remain in control of events ...

Anyways, always release system resources and .term() all ZeroMQ .Context() instances in a graceful manner -- "tidy up" is a fair practice -- both in real life and the more in the code-empires.

# /\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\
#NONSTOP RESPONDER RAW EXAMPLE:
def aMiniRESPONDER( aTarget2Bind2_URL             = "tcp://A.B.C.D:8889",
                    anExternalPREDICTOR           = None,
                    anExternallyManagedZmqCONTEXT = None,
                    aSpreadMinSafetyMUL           = 3.0,
                    aSilentMODE                   = True
                    ):
   try: # RESOURCES LAYER
        # ... SETUP
        # ------------------------------------------------- .Context()
        # can setup a locally-managed context or re-use
        # anExternallyManagedZmqCONTEXT obtained upon a func Call
        aZmqCONTEXT   = anExternallyManagedZmqCONTEXT or zmq.Context( 1 )   

        # localhost:8887 [REP] ... remote [REQ] peer  .connect() + .send()
        aCtrlPORT_URL = "tcp://*:8887"                                      

        # localhost:8890 [PUB] ... remote [SUB] peers .connect() +
        # .subscribe + .recv( zmq.NOBLOCK ) ( MQL4 cannot .poll() so far ...)
        aSIGsPORT_URL = "tcp://*:8890"                                      
        aXmitPORT_URL = aTarget2Bind2_URL

        aListOfSOCKETs = []

        pass # -------------------------------------------------------------# ZMQ
        try: # -------------------------------------------------------------#
            # try: XmitPORT
            aXmitSOCKET = aZmqCONTEXT.socket( zmq.PAIR )

            # XmitPORT
            aXmitSOCKET.bind(      aXmitPORT_URL )                          
            aListOfSOCKETs.append( aXmitSOCKET )
        except:                                                             
            #    EXC: XmitPORT on Failure: GRACEFUL CLEARING XmitPORT

            msg =  "\nEXC. ZmqError({0:s}) on aXmitSOCKET setup / .bind( {1:s} )"
            print msg.format( repr( zmq.ZMQError() ), aTarget2Bind2_URL )
            raise ValueError( "ZMQ_EXC_EXIT @ XmitPORT SETUP" )
        pass # -------------------------------------------------------------# ZMQ
        try: # -------------------------------------------------------------#
            # try: CtrlPORT    
            # CtrlSOCKET [REP] .recv()s<--[REQ] + .send()s--> [REQ]
            aCtrlSOCKET = aZmqCONTEXT.socket( zmq.REP )                     

            # CtrlPORT <-REQ/REP means a remote peer [REQ] has to
            # .send()+.recv() before sending another CtrlCMD
            aCtrlSOCKET.bind(      aCtrlPORT_URL )                          
            aListOfSOCKETs.append( aCtrlSOCKET )
        except:                                                             
            # EXC: CtrlPORT on Failure: GRACEFUL CLEARING both CtrlPORT
            # and XmitPORT
            msg =  "\nEXC. ZmqError({0:s}) on aCtrlSOCKET setup / .bind( {1:s} )"
            print msg.format( repr( zmq.ZMQError() ), aCtrlPORT_URL )
            raise ValueError( "ZMQ_EXC_EXIT @ CtrlPORT SETUP" )
        pass # -------------------------------------------------------------# ZMQ
        try: # -------------------------------------------------------------#
            # try: SIGsPORT

            # SIGsPORT [PUB] .send()s--> [SUB]s
            aSIGsSOCKET= aZmqCONTEXT.socket( zmq.PUB  )                     

            # SIGsPORT -->  PUB/SUB means a remote peer(s) [SUB] .subscribe() + .recv()
            aSIGsSOCKET.bind(      aSIGsPORT_URL )                          
            aListOfSOCKETs.append( aSIGsSOCKET )
        except:                                                             
            # EXC: SIGsPORT on Failure: GRACEFUL CLEARING both CtrlPORT
            # and XmitPORT and SIGsPORT
            msg =  "\nEXC. ZmqError({0:s}) on aSIGsSOCKET setup / .bind( {1:s} )"
            print msg.format( repr( zmq.ZMQError() ), aSIGsPORT_URL )
            raise ValueError( "ZMQ_EXC_EXIT @ SIGsPORT SETUP" )
        pass # -------------------------------------------------------------# ZMQ

        # vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
        # ... SETUP YOUR APPLICATION CODE

        try:     # APP LAYER ___________________________________________
           #           what you want to do
           #           here you go ...

        except:  # APP LAYER ___________________________________________
           #           handle EXCs

        finally: # APP LAYER ___________________________________________
           #           your own application post-mortem / pre-exit code

        # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

   except:  # RESOURCES LAYER .............................................
        # ... code shall handle it's own exceptions + externally caused events

   finally: # RESOURCES LAYER .............................................
        # ... always, ALWAYS gracefully exit ( avoid leakages and dirty things )

        [ allSOCKETs.setsockopt( zmq.LINGER, 0 ) for allSOCKETs in aListOfSOCKETs ]
        [ allSOCKETs.close( )                    for allSOCKETs in aListOfSOCKETs ]

        # --------------------------------------------------------------#
        # RESOURCES dismantled, may .term()

        # .TERM(), NOP otherwise
        if not ( aZmqCONTEXT is anExternallyManagedZmqCONTEXT ):        #
                 aZmqCONTEXT.term()                                     #
        return

Question:

Is there any way to receive multiple messages at once call recv in ZeroMQ like in rabbitmq? I don't find any document mention it.


Answer:

Q : Is there any way to receive multiple messages at once call recv in ZeroMQ like in rabbitmq?

No, there is not.

Unless you re-factor ZeroMQ Architecture to a principally new shape and fashion, so as to become more like a RabbitMQ.

Question:

I am implementing a simple REQ-REP pattern with ZeroMQ in C using multi-part messaging. Most of my messages have strictly 4 parts each (to and fro) with a few exceptions. To enforce the rule, I need to determine the total number of parts of a multi-part message received. Knowing if it is <= 4 is easy. Here is my receiver function:

#define BUFMAX  64 // Maximum size of text buffers
#define BUFRCV  63 // Maximum reception size of text buffers (reserve 1 space to add a terminal '\0')

char mpartstr[4][BUFMAX];


int recv_multi(void *socket,int *aremore)

// Receive upto the first 4 parts of a multipart message into mpartstr[][].
// Returns the number of parts read (upto 4) or <0 if there is an error.
// Returns -1 if there is an error with a zmq function.
// It sets aremore=1 if there are still more parts to read after the fourth
// part (or aremore=0 if not).
{
 int len,rc,rcvmore,pdx,wrongpard=0;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=0;
 len=zmq_recv(socket, mpartstr[pdx], BUFRCV, 0);
 if(len==-1) return len;

 mpartstr[pdx][len]='\0';
 rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1;

 pdx++;
 if(rcvmore==0){*aremore=0; return pdx;}

 while(rcvmore){
   len=zmq_recv (socket, mpartstr[pdx], BUFRCV, 0); if(len==-1) return len; mpartstr[pdx][len]='\0';
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx==4) break;
 }

 *aremore=rcvmore;
 return pdx;
}

All fine. But now in my main() function I check to see if there are more parts by seeing the value of aremore. In those cases where I am not expecting more I will send an error message back to the sender but I have found that ZeroMQ doesn't like it if I don't read ALL the parts of a multi-part message (it reads the remaining parts of this old multi-part message next time I make a call to zmq_recv() function, even after I send a message and expect a new clean multi-part response).

So what I really need is a kind of "flush" function to clear the remaining parts of a message that contains more than 4 parts which I want to discard. So far the only way I have to do this is an ugly arbitrary brute force exhaustion function like so (aremore will have a value of 1 to begin with - it was set by the previous function):

int recv_exhaust(void *socket,int *aremore)

// Receive the remainder of a multipart message and discard the contents.
// Use this to clean out a multi-part 'inbox' from a wrongly sent message.
// Returns 0 on success
// Returns -1 on zmq function failure
// Returns -2 on failure to exhaust even after 1000 parts.
{                                                          
 int len,rc,rcvmore,pdx;
 size_t rcvmore_size = sizeof(rcvmore);

 pdx=1;
 rcvmore=*aremore;

 while(rcvmore){
   len=zmq_recv(socket, mpartstr[0], BUFRCV, 0); if(len==-1) return len;
   rc=zmq_getsockopt(socket,ZMQ_RCVMORE,&rcvmore,&rcvmore_size); if(rc) return -1; 
   pdx++;
   if(pdx>1000) return -2;
 }

 return 0;
}

If there is no dedicated 'flusher' API then at least I could get rid of my arbitrary 1000 message limit if I had some way of knowing in advance how many parts (in total) a given multi-part message has. Surely ZeroMQ knows this because multi-part messages are sent as a whole block. Can anyone point me to the way to find that info? Or is there a proper 'flusher' function/method out there? (for standard C please - not C++/C#, etc.). Thanks in advance.


Answer:

Q : Can anyone point me to the way to find that info?

Yes.

Q : is there a proper 'flusher' function/method out there?

Yes and No :

As far as ZeroMQ v2.x up until v4.3.1, there was no explicit API-call to a "flusher"

The beauty and the powers of the low-latency smart-messaging the ZeroMQ design delivers is built on a wisely crafted Zen-of-Zero : always preferring performance to comfort - as Zero-copy, Zero-warranty and other paradigms suggest.

Naive ( and I bear a lot of pain to trivialise this down to resorting to use a primitive blocking recv() ... ) "flusher" has to go all the way till the ZMQ_RCVMORE does not NACK-flag any more parts "beyond" the multi-frame-last-message ( or zmq_msg_more() == 0 does conform the same ). Still, all these operations do just a pointer-handling, no data gets "moved/copied/read" from RAM, just the pointer(s) get assigned, so it is indeed both fast and I/O-efficient :

int    more;
size_t more_size = sizeof ( more );
do {
      zmq_msg_t part;                       /* Create an empty ØMQ message to hold the message part */
      int rc = zmq_msg_init (&part);           assert (rc == 0 && "MSG_INIT failed" );
      rc = zmq_msg_recv (&part, socket, 0); /* Block until a message is available to be received from socket */
                                               assert (rc != -1 && "MSG_RECV failed" );
                                            /* Determine if more message parts are to follow */
      rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
                                               assert (rc == 0 && "GETSOCKOPT failed" );
      zmq_msg_close (&part);
      } while (more);

Given the RFC-23/ZMTP documented properties, there are but a few (wire-level telemetry encoded) warranties:

1) all messages get sent/delivered:

  • atomically ( either error-free binary-identical all frames, or none at all )
  • at most once ( per relevant peer )
  • in order

2) multi-part messages get additionally an internal (in-band)-telemetry "advice" of state:

  • a bit-flagged state { 1: more-frames-follow| 0: no-more-frames }
  • a bit-flagged size-type { 0: 8b-direct-octet | 1: 64b-"network-endian"-coded }
  • a size-advice { 0~255: direct-size | 0~2^63-1: 64b-"network-endian"-coded-size }

Documented zmq_recv() API is similarly rather explicit in this :

Multi-part messages A ØMQ message is composed of 1 or more message parts. Each message part is an independent zmq_msg_t in its own right. ØMQ ensures atomic delivery of messages: peers shall receive either all message parts of a message or none at all. The total number of message parts is unlimited except by available memory. An application that processes multi-part messages must use the ZMQ_RCVMORE zmq_getsockopt(3) option after calling zmq_msg_recv() to determine if there are further parts to receive.


Whatever "ugly" this may look on a first read, the worst-case that would fit in memory is a huuuuuge amount of SMALL-sized messages inside a multi-part message-frame.

The resulting time to "get-rid-of-'em" is not zero, sure, yet the benefits of compact and efficient internal ZMTP-telemetry and low-latency stream-processing is way more important goal ( and was achieved ).

If in doubts, benchmark the worst-case first :

a) "produce" about 1E9 multipart-message frames, transporting Zero-sized payloads ( no data, but all the message-framing )

b) "setup" simplest possible "topology" PUSH/PULL

c) "select" transport-class of your choice { inproc:// | ipc:// | tipc:// | ... | vmci:// } - best stack-less inproc:// ( I would start a stress-test with this )

d) stopwatch such blind-mechanical-Zero-shortcuts "flusher" between a ReferencePoint-S: when zmq_poll( ZMQ_POLLIN ) has POSACK-ed a presence of any read-able content and a ReferencePoint-E: when the last from the many-part multipart-message was looped-over by the blind-"flusher"-circus.


RESULT INTERPRETATION :

Those nanoseconds, spent between [S] and [E], do count as an evidence of a worst-case of the amount of the time that got "scapegoated" into a knowingly blind-"flusher"-looping circus. In real world use-cases, there will be additional reasons for potentially spending even more time on doing the same.

Yet, it is fair not to forget, that the responsibility of sending of such { knowingly-such-sized | ill-formated }-multi-frame-BEAST(s) is the root cause of any operational-risks on dealing with this in an otherwise ultra-low-latency, high-(almost-linear)-scalability focused messaging/signaling framework.

It is the art of the Zen-of-Zero, that has enabled this happen. All thanks to Pieter HINTJENS and his team, led by Martin SÚSTRIK, we all owe 'em a lot for being able to work with their legacy further on.

Question:

I'm trying to get my head around to the behaviour of zmq with PUB/SUB.

Q1: I can't find a real reason why with the PUSH/PULL sockets combo I can create a queue that actually queue in memory messages that it can't get delivered (the consumer is not available) when with the PUB/SUB not.

Q2: Is there any technical whitepaper or document that describes in detail the internals of the sockets?

EDIT: This example of PUSH/PULL streamer works as expected (the worker join late or restart and gets the queued messages in the feeder. PUB/SUB forwarder does not behave in the same way.


Answer:

While Q1 is hard to be answered / fully addressed without a SLOC ...

there is still a chance your code ( though yet unpublished,which StackOverflow so much encourages user to include in a form aka MCVEand you may already have felt or soon might feel some flames for not doing so ) just forgotten to set a subscription topic-filter

aSubSOCKET.setsockopt( zmq.SUBSCRIBE = "" )          # ->recv "EVERYTHING" / NO-TOPIC-FILTER
aSubSOCKET.setsockopt( zmq.SUBSCRIBE = "GOOD-NEWS" ) # ->recv "GOOD-NEWS" MESSAGES to be received only
A2: yes, there are exhaustive descriptions of all ZeroMQ API calls +

besides the API manpage collection for ØMQ/2.1.1 and other versions,there is a great online published pdf book "Code Connected, Vol.1" from Pieter HINTJENS himself.

Worth reading. A lot of insights into general distributed-processing area and ZeroMQ way.

Question:

I'm working on a program in C++ that needs to be able to send / receive JSON-payloads from an arbitrary number of other clients.

At first, I tried to implement PubNub service, but I figured I can't get and publish messages at the same time (even using two different contexts on distinct threads). I need to be able to do that. I also found that PubNub has too much latency for my liking.

I came across the ZeroMQ library which has a PUB/SUB model that would suit my needs. But all examples I came across explain how to implement it in a way that one process is a Publisher OR a Subscriber, and not both at the same time.

Ideally, I would like to program a server that would relay all messages coming from anyone, to anyone subscribed to a specific channel specified in the message. Anyone should be able to receive and publish messages to anyone else on the network, provided they are subscribed to the correct channel.


UPDATE 1:

Note : I do not need insurance of receipt because payload N+1 will take precedence over payload N. I want a send and forget mean of communication (UDP-like).

As requested : The PubNub limit of 32 kB per JSON-payload was perfect for me, I do not need more. In fact, my payloads are around 4 kB in average. All instances of clients will run on the same local network, so latency should be less than 5 ms ideally. As for the number of clients, there won't be more than 4 clients subscribed to the same channel/topic at a time.


UPDATE 2 :

I cannot predict how many channels/topics will exist ahead of time, but it will be in the order of dozens (most of the time), hundreds (at peak). Not thousands.


Questions:

Q1: - Can I implement such a behavior using ZeroMQ ? Q2: - Is there any working sample demonstrating that (preferably in C++) ? Q3: - If not, any suggestions for a library in C++ ?



Answer:

ZeroMQ : is capable of serving this task well within scales given above nanomsg : is capable of serving this task too, a need to cross-check ports/bindings for clients

Design review:
  • client instances are not persistent, may freely appear on their own, may freely disappear on their own or on error
  • client instance decides on it's own, what it is about to PUB-lish as a message payload
  • client instance decides on it's own, what it is about to SUB-scribe to as an actual incoming stream of messages TOPIC-filter
  • client instance exchanges ( sends ), on it's own, a plain, non-multipart, JSON-formatted messages it has prepared / produced
  • client instance collects ( receives ) messages for which it assumes to be in the same, non-multipart, JSON-formatted shape and for which an attempt to get 'em locally-processed will take place after a receive is complete
  • maximum # of client-instances is not exceeding a low number of hundreds
  • maximum size of any JSON-formatted payload is less than 32 kB, about a 4 kB on average
  • maximum latency acceptable on an E2E process-to-process delivery across a common LAN-collision domain is under 5,000 [usec]
  • server instance is a central-role and a persistent entity
  • server instance provides a known transport-class URL-target for all late-joiners'.connect()-s

Proposal: server may deploy multiple behaviours to meet the given goals, using both the PUB and SUB behaviours, and provides a code-driven, fast, SUB-side attached, non-blocking event-loop .poll() with aligned re-transmission of any of it's SUB-side .recv()-ed payloads to it's PUB-side, currently .connect()-ed, audience ( live client instances ): set s_SUB_recv = aZmqCONTEXT.socket( zmq.SUB ); and s_PUB_send = aZmqCONTEXT.socket( zmq.PUB ); for performance reasons, that are not so tough here, one may also segregate workload-streams' processing by mapping each one on disjunct sub-sets of the multiple created I/O-threads: map s_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 ); and s_PUB_send.setsockopt( ZMQ_AFFINITY, 1 ); set s_PUB_send.bind( "tcp://localhost:8899" );+ set s_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC set s_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling set s_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg set s_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking set s_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE ); and s_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect() Similarly, client instance may deploy a reverse-facing tandem of both a PUB-endpoint and SUB-endpoint, ready to .connect() to a known transport-target-URL. The client specific subscription locally decides, what is to get filtered from the incoming stream of messages ( prior to ZeroMQ v.3.1 API the plentitude of all messages will get delivered to each client instance over the transport class, however since API v.3.1+, the topic-filter is being operated on the PUB-side, which in the desired modus-operandi eliminates the wasted volumes of data over the network, but at the same time, this increases the PUB-side processing overhead ( ref.: remarks on a principle of increased multi-I/O-threads mapping / performance boost above ) set c_SUB_recv = aZmqCONTEXT.socket( zmq.SUB ); and c_PUB_send = aZmqCONTEXT.socket( zmq.PUB ); unless the payload-assembly/processing overhead grows close to the permitted End-to-End latency threshold, there shall be no need to separate / segregate the ZeroMQ low-level I/O-threads here: map c_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 ); and c_PUB_send.setsockopt( ZMQ_AFFINITY, 1 ); set c_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]+ set c_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly set c_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling set c_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last set c_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking set c_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE ); and c_SUB_recv.connect( "tcp://server:8899" );

Discussion:

For hobby projects, there is not much more needed on the messaging-infrastructure, nevertheless for more serious domains, there are additional services both the server and client instances ought have some further formal-communication pattern behaviours added. - r/KBD for a remote keyboard, with a CLI-alike ad-hoc inspection utilities - KEEP_ALIVE transponders for allowing a system-wide state- / perf-monitoring - SIG_EXIT handlers for allowing a system-wide / instance-specific SIG_EXITs - distributed syslog service to allow to safely collect / store a non-blocking replica of log-records ( be it during debug phase or performance-tuninc phase or production-grade records-of-evidence collecting ) - Identity Management tools for audit-trails et al - WhiteList/BlackList for adding robustness to the infrastructure to make it better immune to DoS-attack / poisoning erroneous NIC traffic-bursts et al - Adaptive Node re-Discovery for smarter / ad-hoc infrastructure design and status monitoring or when multi-role / ( N + M )-shaded active hot-standby role-handover/takeover scenarios et al come onto the stage

Summary

A1: Yes, fully within ZeroMQ capabilities A2: Yes, C++ code-samples in the ZeroMQ book / Guides available A3: Ref.: A1, plus may like indepth remark in Martin SUSTRIK's post on "Differences between nanomsg and ZeroMQ"

Hope you will enjoy the powers of distributed processing, be it supported by ZeroMQ or nanomsg or both.

Only one's own imagination is the limit.

If interested in further details, one might love the book referred to in the The Best Next Step section of this post