Hot questions for Using ZeroMQ in msgpack

Question:

I'm trying to use spyne (http://spyne.io) in my server with ZeroMQ and MsgPack. I've followed the examples to program the server side, but i can't find any example that helps me to know how to program the client side.

I've found the class spyne.client.zeromq.ZeroMQClient , but I don't know what it's supposed to be the 'app' parameter of its constructor.

Thank you in advance!

Edit:

The (simplified) server-side code:

from spyne.application import Application
from spyne.protocol.msgpack import MessagePackRpc
from spyne.server.zeromq import ZeroMQServer
from spyne.service import ServiceBase
from spyne.decorator import srpc
from spyne.model.primitive import Unicode

class RadianteRPC(ServiceBase):    
    @srpc(_returns=Unicode)
    def whoiam():
        return "Hello I am Seldon!"

radiante_rpc = Application(
    [RadianteRPC],
    tns="radiante.rpc",
    in_protocol=MessagePackRpc(validator="soft"),
    out_protocol=MessagePackRpc()
)

s = ZeroMQServer(radiante_rpc, "tcp://127.0.0.1:5001")
s.serve_forever()

Answer:

Spyne author here.

There are many issues with the Spyne's client transports.

First and most important being that they require server code to work. And that's because Spyne's wsdl parser is just halfway done, so there's no way to communicate the interface the server exposes to a client.

Once the Wsdl parser is done, Spyne's client transports will be revived as well. They're working just fine though, the tests pass, but they are (slightly) obsolete and, as you noticed, don't have proper docs.

Now back to your question: The app parameter to the client constructor is the same application instance that goes to the server constructor. So if you do this:

c = ZeroMQClient("tcp://127.0.0.1:5001", radiante_rpc)
print c.service.whoiam()

It will print "Hello I am Seldon!"

Here's the full code I just committed: https://github.com/arskom/spyne/tree/master/examples/zeromq

BUT:

All this said, you should not use ZeroMQ for RPC.

I looked at ZeroMQ for RPC purposes back when its hype was up at crazy levels, (I even got my name in ZeroMQ contributors list :)) I did not like what I saw, and I moved on.

Pasting my relevant news.yc comment from https://news.ycombinator.com/item?id=6089252 here:

In my experience, ZeroMQ is very fragile in RPC-like applications, especially because it tries to abstract away the "connection". This mindset is very appropriate when you're doing multicast (and ZeroMQ rocks when doing multicast), but for unicast, I actually want to detect a disconnection or a connection failure and handle it appropriately before my outgoing buffers are choked to death. So, I'd evaluate other alternatives before settling on ZeroMQ as a transport for internal RPC-type messaging.

If you are fine with having the whole message in memory before parsing (or sending) it (Http is not that bad when it comes to transferring huge documents over the network), writing raw MessagePack document to a regular TCP stream (or tucking it inside a UDP datagram) will do the trick just fine. MessagePack library does support parsing streams -- see e.g. its Python example in its homepage (http://msgpack.org).

Disclosure: I'm just a happy MessagePack (and sometimes ZeroMQ) user. I work on Spyne (http://spyne.io) so I just have experience with some of the most popular protocols out there.

I seem to have written that comment more than a year ago. Fast forward to today, I got the MessagePack transport implemented and released in Spyne 2.11. So if you're looking for a lightweight transport for internally passing small messages, my recommendation would be to use it instead of ZeroMQ.

However, once you're outside the Http-land, you're back to dealing with sockets at the system-level, which may or may not be what you want, depending especially on the amount of resources you have to spare for this bit of your project.

Sadly, there is no documentation about it besides the examples I just put together here: https://github.com/arskom/spyne/tree/master/examples/msgpack_transport

The server code is fairly standard Spyne/Twisted code but the client is using system-level sockets to illustrate how it's supposed to work. I'd happily accept a pull request wrapping it to a proper Spyne client transport.

I hope this helps. Patches are welcome.

Best regards,

Question:

I need a fast way to send 300 short messages a second over zeromq between the python multiprocessing processes. Each message needs to contain an ID and time.time()

msgpack seems like the best way to serialize the dict before sending it via zeromq, and conveniently, msgpack has an example of exactly what I need, except it has a datetime.datetime.now().

import datetime

import msgpack

useful_dict = {
    "id": 1,
    "created": datetime.datetime.now(),
}

def decode_datetime(obj):
    if b'__datetime__' in obj:
        obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
    return obj

def encode_datetime(obj):
    if isinstance(obj, datetime.datetime):
        return {'__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")}
    return obj


packed_dict = msgpack.packb(useful_dict, default=encode_datetime)
this_dict_again = msgpack.unpackb(packed_dict, object_hook=decode_datetime)

The problem is that their example doesn't work, I get this error:

    obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
KeyError: 'as_str'

Maybe because I'm on python 3.4, but I don't know what the issue with strptime. Would appreciate your help.


Answer:

Given that messagepack ( import msgpack ) is good at serializing integers, I created a solution which only uses integers:

_datetime_ExtType = 42

def _unpacker_hook(code, data):
    if code == _datetime_ExtType:
        values = unpack(data)

        if len(values) == 8:  # we have timezone
            return datetime.datetime(*values[:-1], dateutil.tz.tzoffset(None, values[-1]))
        else:
            return datetime.datetime(*values)

    return msgpack.ExtType(code, data)


# This will only get called for unknown types
def _packer_unknown_handler(obj):
    if isinstance(obj, datetime.datetime):
        if obj.tzinfo:
            components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond, int(obj.utcoffset().total_seconds()))
        else:
            components = (obj.year, obj.month, obj.day, obj.hour, obj.minute, obj.second, obj.microsecond)

        # we effectively double pack the values to "compress" them
        data = msgpack.ExtType(_datetime_ExtType, pack(components))
        return data

    raise TypeError("Unknown type: {}".format(obj))

def pack(obj, **kwargs):
    # we don't use a global packer because it wouldn't be re-entrant safe
    return msgpack.packb(obj, use_bin_type=True, default=_packer_unknown_handler, **kwargs)


def unpack(payload):
    try:
        # we temporarily disable gc during unpack to bump up perf: https://pypi.python.org/pypi/msgpack-python
        gc.disable()
        # This must match the above _packer parameters above.  NOTE: use_list is faster
        return msgpack.unpackb(payload, use_list=False, encoding='utf-8', ext_hook=_unpacker_hook)
    finally:
        gc.enable()

Question:

I made a PUB/SUB connection using zmqpp and now I want to send data from the publisher to the subscribers using the header-only, C++11 version of msgpack-c.

The publisher has to send 2 int64_t numbers -- header_1 and header_2 -- followed by a std::vector<T> -- data --, where T is determined by the (header_1, header_2) combination.

Sinse there aren't that many examples on how to combine msgpack and zmqpp, the idea I came up with is to send a 3-part message by using zmqpp::message::add/add_raw. Each part would be packed/unpacked using msgpack.

The publisher packs a single data part as follows:

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());

And the receiver unpacks it like this:

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
                static_cast<const char*>(msg.raw_data(0)),
                msg.size(0));
unpackedData.get().convert(&header_1);

When I run the code, I get the following error on the subscriber side:

terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
  what():  insufficient bytes
Aborted

Also, it seems that zmqpp has generated a 5-part message, even though I called add() only 3 times.

Q1: Am I packing/unpacking the data correctly ?

Q2: Is this the proper method for sending msgpack buffers using zmqpp ?

Here are the important parts of the code:

Publisher

zmqpp::socket publisherSock;
/* connection setup stuff ...*/

// forever send data to the subscribers
while(true)
{
    zmqpp::message msg;

    // meta info about the data
    int64_t header_1 = 1234567;
    int64_t header_2 = 89;
    // sample data
    std::vector<double> data;
    data.push_back(1.2);
    data.push_back(3.4);
    data.push_back(5.6);


    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_1);
        msg.add(buffer.data(), buffer.size());
        cout << "header_1:" << header_1 << endl;  // header_1:1234567
    }

    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_2);
        msg.add(buffer.data(), buffer.size());
        cout << "header_2:" << header_2 << endl;  // header_2:89
    }

    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, data);
        msg.add_raw(buffer.data(), buffer.size());
        std::cout << "data: " << data << std::endl;  // data:[1.2 3.4 5.6]
    }

    std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"... why ?
    publisherSock.send(msg);

    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

Subscriber

zmqpp::socket subscriberSock;
/* connection setup stuff ...*/

zmqpp::message msg;
subscriberSock.receive(msg);

int64_t header_1;
int64_t header_2;
std::vector<double> data;

std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"
{
    // header 1
    {
        msgpack::unpacked unpackedData;
        // crash !
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(0)),
                        msg.size(0));
        unpackedData.get().convert(&header_1);
        cout << "header_1:" << header_1 << endl;
    }
    // header 2
    {
        msgpack::unpacked unpackedData;
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(1)),
                        msg.size(1));
        unpackedData.get().convert(&header_2);
        cout << "header_2:" << header_2 << endl;
    }
    // data
    {
        msgpack::unpacked unpacked_data;
        msgpack::unpack(unpacked_data,
                        static_cast<const char*>(msg.raw_data(2)),
                        msg.size(2));
        unpacked_data.get().convert(&data);
        std::cout << "data:" << data << std::endl;
    }

}

EDIT: Problem solved: As pointed out by @Jens, the correct way of packing/sending data is by using zmqpp::message::add_raw()

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());

Answer:

I think the calls to msg.add(buffer.data(), buffer.size()do not add a array of buffer.size() bytes, but call message::add(Type const& part, Args &&...args), which

  1. msg << buffer.data(), which probably calls message::operator<<(bool) since a pointer converts to bool
  2. add(buffer.size()) which then calls msg << buffer.size(), which adds a size_t value as the next part.

Looking at the zmqpp::message class, using message::add_raw should do the trick.

PS: This is all without any guarantee because I have never used zmqpp or msgpack.

Question:

I am using zeromq to read data from an application which uses msgpack for serializing. The code compiles well but throws an invalid argument error when run. Where am I being wrong.

Here is the error: terminate called after throwing an instance of 'zmq::error_t'

what(): Invalid argument Abort (core dumped)

Here's the code.

#include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <msgpack.hpp>
#include <string>

int main(int argc, char *argv[]){
zmq::context_t context (1);

// Open a req port to talk to application
std::string addr = "tcp://127.0.0.1";
std::string req_port = "55555";
zmq::socket_t req (context, ZMQ_REQ);
req.connect(addr+req_port);

// Ask for the subport
zmq::message_t subPortRequest (8);
memcpy (subPortRequest.data(), "SUB_PORT", 8);
req.send(subPortRequest);

zmq::message_t reply;
req.recv(&reply);

std::string sub_port = std::string(static_cast<char*>(reply.data()), reply.size());
std::cout << sub_port << std::endl;


//  Open a sub port to listen to application
zmq::socket_t sub (context, ZMQ_SUB);
std::cout << addr+sub_port << std::endl;
sub.connect(addr+sub_port);

// subscriptions to everything
sub.setsockopt(ZMQ_SUBSCRIBE, "", strlen(""));

while(1){
    zmq::message_t reply_topic;
    sub.recv(&reply_topic);
    std::string topic = std::string(static_cast<char*>(reply_topic.data()), reply_topic.size());

    zmq::message_t reply_msg;
    sub.recv(&reply_msg);
    std::string msg = std::string(static_cast<char*>(reply_msg.data()), reply_msg.size());

    msgpack::object_handle oh = msgpack::unpack(msg.data(), msg.size());
    msgpack::object obj = oh.get();
    std::cout << obj << std::endl;

} 

}

Answer:

Most probably the string fails to meet the spec:

While the source instructs to do this:

zmq::socket_t req ( context, ZMQ_REQ );  // __________.SET [REQ] access point
// Open a req port to talk to application ____________.SET strings
std::string addr     = "tcp://127.0.0.1"; // _________.SET    "IP"-part
std::string req_port = "55555";           // _________.SET "PORT#"-part

req.connect( addr + req_port );           // _________.CONNECT( "IP"+"PORT#" )

the ZeroMQ .connect() method ought get a string of about this shape:

.connect( "tcp://127.0.0.1:55555" );------------------------------------------------^


Anyway, enjoy building the Smart Distributed Systems with the powers of ZeroMQ