Hot questions for Using ZeroMQ in boost

Question:

I have an application in which I'd like to send part of its mutable state over the network to another machine (there will be a cluster of those machines) to do some CPU-intensive computations on it and get back the results. Like asynchronous RPC. Such calls will happen many times during the execution of the program, so I'd like to make the overhead as small as possible, e.g. minimize the number of redundant copies of the data. The size of the data varies from tens of bytes to hundreds of KBs, maybe even few MBs. Its structure is relatively complex, it consists of a set of object trees, but the leaves contain only primitive types and the internal nodes contain minimal metadata.

I'm considering Cap'n Proto for serialization (though, in this case I'd have to create a redundant model for my data), and ZeroMQ for transport. On the client/main application side I'd like to use azmq, because I need Boost:Asio's features (namely coroutine/fiber support). The language is C++.

Summarizing with a very rough sketch:

RelativelyComplexState data;
CapnProtoRequest cp_req = buildRequest(data); // traverses my data, creates C'n P object
azmq_socket.async_send(boost::asio::buffer(cp_req, cp_req.size)); //azmq always copies the buffer? Not good.
// do other stuff while request is being processed remotely
// get notification from azmq/Boost:Asio when reply has arrived
azmq::message msg();
azmq_socket.async_receive(some_message_handler?); // get all the data into msg
CapnProtoResponse cp_resp = parseResponse(msg.cbuffer()); // interpret bytes as C'n P object, hopefully no copy
RelativelySimpleResult result = deserialize(cp_resp);

Is this feasible, or is there a better way? Would a schemaless serialization method (i.e. Boost::Serialization) make my life easier and/or the application more efficient in this case?

Also, what is the best way to send and receive a Cap'n Proto object with ZeroMQ/azmq, avoiding unnecessary copies? By looking at the source code of azmq, it seems that for sending, azmq always copies the buffer contents. What are the more subtle issues (segmenting/framing, etc.)? I'm not familiar with the libraries and haven't found any explanation or good examples.

Thank you!


Answer:

I do not know much about ZeroMQ's interface but I can give advice on how to minimize copies from Cap'n Proto.

On the sending side, use capnp::MessageBuilder::getSegmentsForOutput() (capnp/message.h) to get direct pointers to the message's content without copying. This gives you an array of arrays of bytes (actually, words, but you can cast them to bytes). You need to somehow feed these to ZeroMQ without copying them. You'll need to make sure that the boundaries between segments are preserved -- the goal is to come up with exactly the same array of arrays on the receiving end. Maybe ZeroMQ has explicit support for multi-segment messages and can remember the segment boundaries for you; if not, you'll need to prefix your message with a table of segment sizes.

On the receiving side, once you have rebuilt your array of segments, construct a capnp::SegmentArrayMessageReader (capnp/message.h) and pass the array to the constructor. This will use the underlying data without copying. (Note that you will need to make sure that the data is aligned on a 64-bit boundary. I'm not sure if ZeroMQ guarantees this.)

Note that if both your client and server are C++, you may want to consider using Cap'n Proto's own RPC protocol, which is easier to set up and already avoids all unnecessary copies. However, integrating Cap'n Proto's event loop with boost::asio is currently non-trivial. It's possible -- for example you can look at node-capnp which integrates Cap'n Proto with libuv's event loop -- but may be more work than you want to do.

(Disclosure: I'm the author of Cap'n Proto.)

Question:

I'm trying to integrate Boost Asio with ZeroMQ. The messaging is functional for the first connection, but the program exits with the error "Bad File Descriptor" when the initial connection ends.

I'm using the Boost.Beast example code of the Async Websocket Server to make a connection with the client. I then open a ZMQ socket. The client sends a message to the server over a Websocket connection, the message is send over a ZMQ socket to a different server, the server will do some processing, the server sends the message back over ZMQ, and the final message is sent back to the client over the same Websocket connection.

I am using This Code to integrate Boost with ZMQ. The line of importance is

int zfd;
optlen = sizeof (zfd);
zmq_getsockopt (zmq_sock_, ZMQ_FD, &zfd, &optlen);
sock_.assign (boost::asio::ip::tcp::v4(), zfd);

This gets a file descriptor from the ZMQ socket and wraps it with the Boost socket so everything plays nice. However, when the destructor is called:

sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
sock_.close();
zmq_close (zmq_sock_);  

I get an error that there is a Socket operation on a Non-socket because it seems that the socket has been closed. If I remove the socket shutdown and close, I get a Bad File Descriptor issue with ZMQ. It seems that the Session Websocket object is partially destroying the Asio-ZMQ objects. If I remove the destructor entirely, the program doesn't crash, but it does not work properly anymore. i.e. it won't send any more messages over ZMQ.

I've been struggling with this problem for days and I'm hoping that I can get some help. If it helps, my code takes the my_zmq_req_client class and integrates it into the Boost.Beast session class.


Answer:

I haven't looked at the linked library, but this fragment

sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
sock_.close();
zmq_close (zmq_sock_);  

looks suspicious as sock_.close() is meddling with a socket that wasn't opened by it. I'd suggest it makes a lot more sense to release the socket on the asio side, instead of closing it, so that ZMQ can continue having the responsibility over creation/destruction.

sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
sock_.release();
zmq_close (zmq_sock_);  

Question:

What I want is when one message queue receives an int N, the handler function will be called after N seconds. below is my code.

It runs OK if the duration seconds of two near message queue is larger than the int N, but the handler will print "Operation canceled" in one handler when the duration seconds between two received message queues are smaller than N, which is not what I want.

I'd appreciate a lot for any help.

#include <boost/asio.hpp>
#include <zmq.h>
#include <boost/thread.hpp>
#include <iostream>

boost::asio::io_service io_service;

void* context = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);


void handler(const boost::system::error_code &ec) {
    std::cout << "hello, world" << "\t" << ec.message() << std::endl;
}

void run() {
    io_service.run();
}

void thread_listener() {

     int nRecv;
     boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(0));
     while( true ) {
         zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
         std::cout << nRecv << std::endl;
         timer.expires_from_now(boost::posix_time::seconds(nRecv));
         timer.async_wait(handler);
     }

 }

 int main(int argc, char* argv[]) {

     boost::asio::io_service::work work(io_service);

     zmq_bind(sock_pull, "tcp://*:60000");
     boost::thread tThread(thread_listener);
     boost::thread tThreadRun(run);
     tThread.join();
     tThreadRun.join();
     return 0;

 }

Answer:

When you call

timer.expires_from_now(boost::posix_time::seconds(nRecv));

this, as the documentation states, cancels any async timer pending.

If you want to have overlapping requests in flight at a given time, one timer is clearly not enough. Luckily there is a wellknown pattern around bound shared pointers in Asio that you can use to mimick a "session" per response.

Say you define a session to contain it's own private timer:

struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N)) 
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

Now, it's trivial to replace the code that used the hardcoded timer instance:

 timer.expires_from_now(boost::posix_time::seconds(nRecv));
 timer.async_wait(handler);

with the session start:

 boost::make_shared<session>(io_service, nRecv)->start();

A fully working example (with suitably stubbed ZMQ stuff): Live On Coliru

#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <iostream>

boost::asio::io_service io_service;

/////////////////////////////////////////////////////////////////////////
// I love stubbing out stuff I don't want to install just to help others
enum { ZMQ_PULL };
static void* zmq_ctx_new()         { return nullptr; }
static void* zmq_socket(void*,int) { return nullptr; }
static void  zmq_bind(void*,char const*) {}
static void  zmq_recv(void*,int*data,size_t,int) 
{ 
    boost::this_thread::sleep_for(boost::chrono::milliseconds(rand()%1000));
    *data = 2;
}
// End of stubs :)
/////////////////////////////////////////////////////////////////////////

void* context  = zmq_ctx_new();
void* sock_pull = zmq_socket(context, ZMQ_PULL);

struct session : boost::enable_shared_from_this<session> {
    session(boost::asio::io_service& svc, int N) :
        timer(svc, boost::posix_time::seconds(N)) 
    {
        // Note: shared_from_this is not allowed from ctor
    }

    void start() {
        // it's critical that the completion handler is bound to a shared
        // pointer so the handler keeps the session alive:
        timer.async_wait(boost::bind(&session::handler, shared_from_this(), boost::asio::placeholders::error));
    }

    ~session() {
        std::cout << "bye (session end)\n";
    }

  private:
    void handler(const boost::system::error_code &ec) {
        std::cout << "hello, world" << "\t" << ec.message() << std::endl;
    }

    boost::asio::deadline_timer timer;
};

void run() {
    io_service.run();
}

void thread_listener() {
    int nRecv = 0;
    for(int n=0; n<4; ++n) {
        zmq_recv(sock_pull, &nRecv, sizeof(nRecv), 0);
        std::cout << nRecv << std::endl;

        boost::make_shared<session>(io_service, nRecv)->start();
    }
}

int main() {
    auto work = boost::make_shared<boost::asio::io_service::work>(io_service);

    zmq_bind(sock_pull, "tcp://*:60000");
    boost::thread tThread(thread_listener);
    boost::thread tThreadRun(run);

    tThread.join();
    work.reset();

    tThreadRun.join();
}

Question:

Shout out if there is something better we should consider:

I am looking for a very quick and simple way to get several programs (e.g. 5) - each running on separate nodes on a private OpenStack cloud to talk to each other.

  • Packets will be short C++ structs (less than 100 bytes)
  • Traffic will be light (probably less than 100/second)
  • Latency is really not an issue. (what is a few ms between friends?) - we have lots of cycles and memory
  • Messages should be done as pub/sub client/server paradigm
  • Library should be C++ friendly. But work both on Windows and Linux
  • We might need additional language bindings later on
  • We would prefer not to lose messages

Here is the first idea I have. But if you have something else to offer. Yell out.

Friendly Wrapper for UDP socket layer:

Encoder/Decoder for C++ struct data:


Answer:

For serialisation, almost anything with the right language bindings will do. Google Protocol Buffers are language-agnostic, lots of bindings available. The only thing to avoid is serialisation that is built into your source code (like Boost's serialisation is / was), because then you can't readily port that to another language.

For message transport, ZeroMQ, NanoMsg are good choices. However, I think it really comes down to

  1. How badly you don't want to lose messages,
  2. Exactly what you mean by "lost message" in the first place.

The thing about ZeroMQ (and NanoMsg) is (AFAIK) there is no real way of knowing the fate of a message when a fault occurs. For instance, in ZeroMQ, if you send a message and the recipient just happens to be working and connected, the message gets transferred over the connection. The sending end now thinks that the job is done, the message has been delivered. However, unless and until the receiving end actually calls zmq_recv() and fully processes what it gets given, the message can still get lost if the receiving end process crashes, of there is a power failure, etc. This is because until it is consumed the message is stored in RAM inside the ZeroMQ run thread ( inside the respective Context()-instance's domain of control ).

You can account for this by having some sort of ack message heading back the other way, timeouts, etc. But that starts getting fiddly, and you'd be better off with something like RabbitMQ.

Question:

This is a followup question of here.

I am solving the first challenge from 2016 Marathon of Parallel Programming. This question is related to the first challenge: string parsing.

If you read the problem set of here, obviously the 5th step is parallelizable, which is [Try all derivations recursively.] on page 3.

So I would like to parallelize the solution by implementing this below with ZeroMQ and Boost(to serialize the structure res_package, which contains jobs for worker thread and result to main thread):

The PUSH socket group in main is used to distribute tasks. The PULL socket group in main collects results from worker threads. Lastly the PUB socket group is for sending kill signal to workers.

There is a step to do sync between main and worker, otherwise the PUSH socket in main would send lots of jobs to the first connected worker. I quote from my post:

So what main thread does is: pushing #worker_num of sync msgs with its PUSH endpoint to worker threads each time and then reads confirmation msg from its PULL endpoint. If main thread retrieves #worker_num of confirmation msgs, then sync done. Format of the sync msg from worker is: the worker thread's ID in a string. So thread 0 would pass a 0 in string back to main thread.

If main thread receives a meaningful result, which is an Eval with its second field as true(meaning the string is accepted by the grammar), main thread would publish kill signal. After all worker threads send back confirmation, which is the field bool exit_confirmed in struct res_package, main thread join worker thread and print final result.

The problem is, I got runtime error from Boost. I have no clue what happened:

# ./spec < ./spec.in
main() : creating thread, 0
thread 0 receives: sync
to_string 0
thread 0 sends: 0, with size: 1
thread 0 sync done
pass 0 to if_sync_done
main thread receives sync msg from thread 0
sync done in main thread
456Dynamic exception type: boost::archive::archive_exception
std::exception::what: input stream error

This exception is caused by line 912(from GDB backtrace). So what I guess is that the problem is about dangling pointer, or the received in line 909 in somehow truncated. But I don't know how to go further.

To build my project, there are some files you need: spec.cc, spec.hh, Makefile

The original project you could download from here. There are spec.in and judge.in for testing.

To install dependencies on Ubuntu, run:

apt-get install -y libzmqpp3 libzmqpp-dev libzmq5 libzmq5-dbg libboost-all-dev build-essential g++

One important data structure is:

struct res_package
{
  int i;
  Set <Stack> ls; // from worker thread to main thread
  Stack s; // from main thread to worker thread, job to process
  bool if_accepted;
  bool exit_confirmed;
  Eval res;
  bool set_nonempty;

// striped.....

  template <typename Archive>
  void serialize(Archive& ar, const unsigned int version)
  {
    ar & i;
    ar & if_accepted;
    ar & exit_confirmed;
    ar & s;
    ar & ls;
    ar & res;
    ar & set_nonempty;
  }
}

Worker thread receives Stack s and see if the string could be accepted by the rule s. If so, it sets if_accepted to true, then sends back the Eval res. If no and there is multiple possible rules found, it sends back a set of all possible rules ls, and sets bool set_nonempty to true to indicate main to distribute those rules.

If you feel confused, pls post a comment.


Answer:

You didn't include the serializing code in the post, but I found it in spec.cc:

{
    std::ostringstream obuffer;
    boost::archive::text_oarchive oarchive(obuffer);

    tmp.exit_confirmed = true;
    oarchive & tmp;
    std::string req_str(obuffer.str());
    zmq::message_t res_msg(req_str.size());
    memcpy((void *)res_msg.data(), req_str.data(), req_str.size());
    sendres_socket.send(res_msg);
}

Here you send the stream before the archive is closed. Try explicitly closing it, or limiting the lifetime:

{
    std::ostringstream obuffer;
    {
        boost::archive::text_oarchive oarchive(obuffer);

        tmp.exit_confirmed = true;
        oarchive & tmp;
    }
    std::string req_str(obuffer.str());
    zmq::message_t res_msg(req_str.size());
    memcpy((void *)res_msg.data(), req_str.data(), req_str.size());
    sendres_socket.send(res_msg);
}

Question:

I am trying to write a network transfer application.

The data is binary data and each packet size is mostly 800KB.

The client produces 1000 data per second. I want transfer data as quick as possible.

When I use ZeroMQ, the speed hits 350 data per second, but the boost asio hits 400(or more) per second.

As you can see the performance of both methods is not good.

The pattern used for ZeroMQ is a PUSH/PULL pattern, the boost asio is simple sync I/O.

Q1: I want to ask, is ZeroMQ only suitable for small messages?

Q2: Is there a way to improve the ZeroMQ speed?

Q3: If ZeroMQ can't, please advice some good method or library to improve these kind of data transfer.


Answer:

Data Rate

You're attempting to move 800 MByte/second. What sort of connection is this? For a tcp:// transport-class it'd have to something pretty rapid, e.g. 100 Gbit/s Ethernet, which is pretty exotic.

So I'm presuming that it's an ipc:// transport-class connection. In which case you can get an improvement, using ZeroMQ zerocopy functions, which saves copying the data repeatedly.

With a normal transfer, you have to copy data into a zmq message, that has to be copied into an ipc pipe, copied out again, and copied back into a new zmq message at the receiving end. All that copying requires 4 x 800 = 2.4 GByte/sec memory bandwidth which, by the time cache conflicts have come into play, is an appreciable percentage of the total memory bandwidth of a typical PC system. Using zerocopy should cut that in half.

Alternative to Zero Copy - Zero Transfer

If you are using ipc://, then consider not sending data through the sockets, but sending references to the data through the sockets.

I have previously blended use of zmq and a semaphore locked C++ stl::queue, using zmq simply for it's pattern ( PUSH/PULL in my case ), the stl::queue to carry shared pointers to data, and leave the data still. The sender locks the queue, puts a shared pointer into it, and then sends a simple message ( e.g. "1" ) through a zmq socket. The recipient reads the "1" and uses that as a cue to lock the queue and pull a shared pointer off it. Thus a shared pointer to data has been transferred from one thread to another in a ZMQ pattern via a stl::queue, but the data itself has stayed still. All I've done is pass ownership of the data between threads. It works so long as the shared pointer that the send has goes out of scope immediately after sending and is not used by the sender to modify or access the data.

PUSH/PULL is not too bad to deal with - each message goes to only one recipient. It would take more effort to make such a blend with PUB/SUB, and received messages would have to be treated as read-only because each recipient would have a shared pointer to the same block of data as everyone else.

Message Size

I've not idea how big a chunk zmqtp transfers at a time, but I'd guess that it's relatively efficient in terms of protocol:data ratio.