Hot questions for Using ZeroMQ in protocol buffers

Top 10 C/C++ Open Source / ZeroMQ / protocol buffers

Question:

I am trying to define a common basic message which defines the type of the message (for easier parsing) and is then extended with the actual message. The messages will be used in an RPC way.

My .proto file

syntax = "proto2";
package userapi;

// wrapper for commands
message Command
{
    // for "subclassing"
    extensions 100 to max;

    enum Type
    {
        Request = 1;
        Response = 2;
    }

    required Type type = 1;
}   

message Request
{
    // register this message type as extension to command
    extend Command
    {       
        optional Request command = 100;
    }

    optional string guid = 1;
}

message Response
{
    // register this message type as extension to command
    extend Command
    {       
        optional Response command = 101;
    }

    optional string guid = 1;

    //! error handling for command
    optional ErrorMsg error = 2;

    message ErrorMsg
    {
        enum ErrorCode
        {
            //! no error occured
            NONE = 0;
            //! the requested GUID already existed
            GUID_NOT_UNIQUE = 1;
        }

        optional string ErrorString = 1;
    }
}

Somewhat similar to this example, but i cant seem to set the extension value via

Command commandRequest;
commandRequest.set_type(Command_Type_Request);

auto extension = commandRequest.GetExtension(Request::command);
extension.set_guid("myGuid");

commandRequest.SetExtension(Request::command, extension);

The SetExtension() call fails with the following error message

error C2039: 'Set' : is not a member of 'google::protobuf::internal::MessageTypeTraits'

Unfortunately, this similar question does also not feature an example of the construction under c++.

Did i misunderstand the concept of extensions? What is a more clean way to establish this (and no, i dont want to serialize the command into a string).

I was following the examples under "nested extensions" in the documentation, which only sets basic types. I also tried to understand how rpcz solves this problem, but i failed, maybe some hints will help with this question?


Answer:

Extensions are a lot like regular fields. For primitive fields, you get accessors to get and set the field. For sub-messages, though, you don't get a "set" accessor -- you get "get" and "mutable", just like you would for a regular submessage field. So, you want:

Request* request =
    commandRequest.MutableExtension(Request::command);
request->set_guid("myGuid");

Question:

I have a simple setup of one client and one server. The client wants to execute a method in the server using ZeroMQ for communications. I am going to use the REQ and REP sockets because they are suitable for this use case. Nevertheless I have a doubt about the protobuf definitions. I think these two options can be used for achieving the goal:

message ControlService{
    string control = 1;
    int32 serverId = 2;
    bool block = 3;
    double temperature = 4;
}

Where "control" contains the name of the method to be executed remotely. The other alternative can be:

message InputParameters{
    int32 serverId = 1;
    bool block = 2;
    double temperature = 3;
}
message Empty{

}
service ControlService{
    rpc control (InputParameters) returns (Empty);
}

What would be the best approach? Or at least what are the trade-offs of using one approach instead of the other?


Answer:

Don't do it that way. Have a message:

message InputParameters{
    req oneof
    {
        InputParametersA a = 1;
        InputParametersB b = 2;
    }
}
message InputParametersA
{
    bool block = 1;
    float temperature = 2;
}
message InputParametersB
{
    <more fields>
}

That way you send only the InputParameters message. The method to call is dictated by whether InputParameters.req contains an InputParametersA (implies that method A should be called), or InputParmetersB (for method B).

This avoids parsing a string to determine a method name (highly error prone), and instead gives you an enumeration to switch on (the possible content of the req field). Depending on the implementation of GPB you're using (C++, etc) you may even get a compile time warning if your switch statement doesn't adequately cover all values of that enumeration.

It also means that there's no problems in determining which of the fields should be passed to a method; you're passing either InputParameters.req.a into method A or .b into method B. There's no need to break them out into separate parameters for a method, simple pass the whole thing in as a single parameter.

You can define different return types in the same way, passing them all back through a single oneof.

Alternatives

Now if you were using ASN.1 (which is conceptually the same kind of thing as GPB), you could set constraints on the values and / or sizes of message fields (see here, Chapter 13 in this PDF. That way you'd have parameter validation being performed automatically, defined solely in the ASN.1 schema. The lack of value / size constraints in GPB is a glaring omission.

Take a look here (overview), here (free schema compiler for C/C++ that looks OK), and here (PDF, reference manual).

ASN.1 has stronger typing in its wire format (if you use BER encoding). It's possible to interrogate a wire bit stream to find out what type of message it contains. Thus there is no need to resort to wrapping all your possible messages up into a single oneof like you do with GPB.

Question:

Suppose I want to serialize and transmit protobuf binaries with ZMQ using a protocol defined in cake.proto:

syntax = "proto3";

message Cake {
    int32 radius = 1;
}

I can find plenty of examples for the PUB/SUB pattern where a subscriber filters a topic with a string: socket.setsockopt_string(zmq.SUBSCRIBE, "abc")

But how does subscribing to topics work when it comes to protobuf binaries? Do I use the bytes themselves or does ZMQ provide an wrapper for a message with a header I can use for cases like this?


Answer:

There is no wrapper for this, the subject is just the first frame of the zeromq message.

If you are confident your protobuf messages will always start with the specific sequence of bytes (that make your subject) then yes you can just subscribe to that byte prefix pattern.

The other option is to copy the subject pattern into an initial frame then add the protobuf frame(s) via ZMQ_SNDMORE. If you can pack many protobuf frames into that same zmq message then the efficiency is good. If each protobuf message has its own "subject" then you will have the overhead of an extra subject frame per protobuf.

Question:

According to the Google Protocol Buffers documentation under 'Defining Services' they say,

it's also possible to use protocol buffers with your own RPC implementation.

To my understanding, Protocol Buffers does not implement RPC natively. Instead, they provide a series of abstract interfaces that must be implemented by the user (Thats me!). So I want to implement these abstract interfaces utilizing ZeroMQ for network communication.

I'm trying to create an RPC implementation using ZeroMQ because the project i'm working on already implements ZeroMQ for basic messaging (Hence why I'm not using gRPC, as the documentation recommends).

After reading through the proto documentation thoroughly, i found that I have to implement the abstract interfaces RpcChannel and RpcController for my own implementation.

I've constructed a minimalized example of where I'm currently at with my RPC Implementation

.proto file: Omitted SearchRequest and SearchResponse schema for brevity

service SearchService {
    rpc Search (SearchRequest) returns (SearchResponse);
}

SearchServiceImpl.h:

class SearchServiceImpl : public SearchService {
 public:
  void Search(google::protobuf::RpcController *controller,
                    const SearchRequest *request,
                    SearchResponse *response,
                    google::protobuf::Closure *done) override {
    // Static function that processes the request and gets the result
    SearchResponse res = GetSearchResult(request);

    // Call the callback function
    if (done != NULL) {
    done->Run();
    }
    }
  }
};

MyRPCController.h:

class MyRPCController : public google::protobuf::RpcController {
 public:
    MyRPCController();

    void Reset() override;

    bool Failed() const override;

    std::string ErrorText() const override;

    void StartCancel() override;

    void SetFailed(const std::string &reason) override;

    bool IsCanceled() const override;

    void NotifyOnCancel(google::protobuf::Closure *callback) override;
 private:
  bool failed_;
  std::string message_;
};

MyRPCController.cpp - Based off of this

void MyRPCController::Reset() {  failed_ = false; }

bool MyRPCController::Failed() const { return failed_; }

std::string MyRPCController::ErrorText() const { return message_; }

void MyRPCController::StartCancel() { }

void MyRPCController::SetFailed(const std::string &reason) {
  failed_ = true;
  message_ = reason;
}

bool MyRPCController::IsCanceled() const { return false; }

void MyRPCController::NotifyOnCancel(google::protobuf::Closure *callback) { }

MyRPCController::ChiRpcController() : RpcController() { Reset(); }

MyRpcChannel.h:

class MyRPCChannel: public google::protobuf::RpcChannel {
 public:
    void CallMethod(const google::protobuf::MethodDescriptor *method, google::protobuf::RpcController *controller,
                    const google::protobuf::Message *request, google::protobuf::Message *response,
                    google::protobuf::Closure *done) override;
};

Questions I have with my example thus far:

  • Where do I fit ZeroMQ into this?
    • It seems like it should be going into RPCChannel, because in the examples i see (See 3rd code block here), they pass a string that has the ports to bind to (i.e. MyRpcChannel channel("rpc:hostname:1234/myservice");)
  • I'm concerned with my RPCController implementation, it seems too simple. Should more be going here?
  • How do i implement RPCChannel, it seems very similar to the SearchServiceImpl. The 1 virtual function in these classes has a very similar method signature, except it's generic.

Here's some other Stack Overflow questions I came across that had some helpful information on the topic:

  1. Protobuf-Net: implementing server, rpc controller and rpc channel - This is where i found the example for the RPCController implementation.
  2. Using Protocol Buffers for implementing RPC in ZeroMQ - This answer is interesting because in the top answer, is seems that they're recommending against using Protobufs built in RPC formatting for the .proto file.
    • I also noticed this same notion in this file, in a repository called libpbrpc which seemed like a good source for example code
  3. Can I/Should I be using an existing implementation such as RPCZ?

Thank you for your help. I hope I gave enough information and was clear in what I'm looking for. Please let me know if something is unclear or lacking in information. I'd be happy to edit the question accordingly.


Answer:

  • ZeroMQ provides a low-level API for network communication based on messages that can contain any data.
  • ProtoBuffers is a library that encodes structured data as compressed binary data and decodes such data.
  • gRPC is a RPC framework that generates code for network communication based RPC services with functions that exchange data as ProtoBuffers data.

Both ZeroMQ and gRPC provides support for network communication but in different ways. You have to chose either ZeroMQ, either gRPC for network communication. If you choose ZeroMQ, messages can be encoded using ProtoBuffers exchanging binary structured data.

The main point is ProtoBuffers library allows variant records (similar to C/C++ unions) to be encoded and decoded that can fully emulate the functionality provided by RPC services having functions exchanging ProtoBuffers messages.

So the options are:

  1. Use ZeroMQ with send and receive primitives and ProtoBuffers encoded variant messages that can contain various sub-messages, like
union Request
{
  byte msgType;
  MessageType1 msg1;
  MessageType2 msg2;
  MessageType3 msg3;
}

union Response
{
  byte msgType;
  MessageType3 msg1;
  MessageType4 msg2;
  MessageType5 msg3;
}

send(Request request);
receive(Response response);
  1. Use gRPC generating a service with functions, like
service MyService 
{
  rpc function1(MessageType1) returns (Response);
  rpc function2(MessageType2) returns (Response);
  rpc function3(MessageType3) returns (Response);

  rpc functionN(MessageType3) returns (MessageType5);
}

(here it's possible to use many many combinations)

  1. Use just a single-function gRPC service, like
service MyService 
{
    rpc function(Request) returns (Response);
}

The option could depend on

  • preferred target for client: ZeroMQ or gRPC based client
  • performance reasons comparing ZeroMQ vs gRPC based service
  • specific features like how subscription is used/handled in ZeroMQ vs gRPC based service and client (see How to design publish-subscribe pattern properly in grpc?)

For the 1st option, you have to do a lot of stuff comparing to 2nd option. You have to match the type of message sent with the types of expected messages to be received.

The 2nd option would allow an easier/faster understanding of functionality of the service provided if the somebody else will develop the client.

For developing a RPC service on top on ZeroMQ I would define such .proto file specifying the functions, parameters (all possible input and output parameters) and errors like this:

enum Function 
{
    F1 = 0;
    F2 = 1;
    F3 = 2;
}

enum Error 
{
    E1 = 0;
    E2 = 1;
    E3 = 2;
}

message Request
{ 
    required Function function = 1;
    repeated Input data = 2;
}

message Response
{ 
    required Function function = 1;
    required Error error = 2;
    repeated Output data = 3;
}

message Input
{ 
    optional Input1 data1 = 1;
    optional Input2 data2 = 2;
    ...
    optional InputN dataN = n;
}

message Output
{ 
    optional Output1 data1 = 1;
    optional Output2 data2 = 2;
    ...
    optional OutputN dataN = n;
}

message Message
{
   repeated Request requests;
   repeated Response responses;
}

and depending on function id, at run-time the number and the types of parameters have to be checked.

Question:

I am currently trying to send some protobufs using zmq and so far this has worked great. Now I started a different ( and more complicated ) protobuf and it stopped working.

I already found the error, which is that a de-serialization of the protobuf-object creates a string containing the nullbyte \0.

EDIT: I think I've found the cause. In zmq_send there is a line, where memcpy is being called. Since everything after \0 is supposedly not accessible, I get an error. Still no idea what to do.

So far I haven't come up with a solution to this problem, do you guys have an idea?


Answer:

Please, show you code. It is not clear what do you do and what expect to get. Neither protobuf nor zmq are not bound to ASCIIZ. The following sample sequence works fine for any kind of data within protobuf structure pb

reqSize = pb->ByteSize();
reqBuf = new char [reqSize];
pb->SerializeToArray(reqBuf, reqSize);
zmq_send(zc, reqBuf, reqSize, 0);

Question:

I'm using protocol buffers over zeroMQ to send graph data from C++ to a Javascript frontend:

message RawData
{
   Type type = 1;
   bytes data = 2;
}

when i call RawData.getData() i get something like this (usually much longer):

Data: 0, 0, 0, 0, 0, 0, 0, 0, 64, 1, 118, 62, 112, 8, 12, 63

This is two 8 bytes numbers, where each index in the array is 1 byte.

How do I convert this into double representation in Javascript?

EDIT: i ended up changing the protocol buffer message to repeated double data = 2;

this removed the need for conversion


Answer:

Just divide the result in 8 elements parts and use ArrayBuffer and Dataview to mount the number. Example:

function splitData(data) {
    return data.reduce(function(a, b, c){
        if(c % 8 == 0  && c !== 0){
            a.push([]); };
        a[a.length - 1].push(b);
        return a;
    }, [[]]);
}

function computeValues(data, littleEndian = false) {
    var res = [];
	splitData(data).forEach(numberElement=>{
        var buffer = new ArrayBuffer(8);
        var dv = new DataView(buffer);
        numberElement.forEach((val, ix)=>dv.setUint8(littleEndian ? 7 - ix : ix, val));
        res.push(dv.getFloat64(0));
    });
    return res;
}


var data = [0, 0, 0, 0, 0, 0, 0, 0, 64, 1, 118, 62, 112, 8, 12, 63];
console.log(computeValues(data));
console.log(computeValues(data, true));

Question:

I am testing a code that uses zmq as socket and networking tool, and protobuf for serialization.

The code receives a zmq_message and parses it to a protobuf class, in return I change the value of one of the class members and send the same class back to the requestor.

Somehow during this process a zmq assertion check() fails. I don't really know why it is happening as everything looks okay to me.

The code looks like this in the main file:

zmq::socket_t external(context, ZMQ_REP);
external.bind("tcp://*:29067");

zmq::message_t request;
external.recv(&request);
msg.deserialize(request);

msg.set_probed_value(12.0);
zmq::message_t response = msg.serialize();
external.send(response);

deserialize method looks like this.

_msg.ParseFromString(reinterpret_cast<const char*>(msg.data()));

and serialize method as below:

zmq::message_t request(_msg.ByteSize());
std::string value = _msg.SerializeAsString();
memcpy(request.data(), reinterpret_cast<const void*>(value.c_str()), value.size());
return request;

and set_probed_value() looks like this:

void set_probed_value(const double& val)
{
    _msg.clear_probed();
    _msg.set_probed(val);
}

I know for a fact that problem is caused when I set the valued of probed to a different number than what it was set at parse time. If I remove that line msg.set_probed_value(12.0), no exception happens and everything is okay.

Assertion failed: check () (/apps/zmq/libzmq/src/msg.cpp:347)


Answer:

The Suspect? Violating the ZeroMQ API published principles

All message manipulations are know to be rather fragile.

What about doing an explicit content-copy first, instead of a just syntax sugared reinterpret_cast<...>( msg.data() ) content ( referenced by a ZeroMQ delivered pointer ) being directly manipulated?

Avoid modifying message content after a message has been copied with zmq_msg_copy(), doing so can result in undefined behaviour. If what you need is an actual hard copy, allocate a new message using zmq_msg_init_size() and copy the message content using memcpy().

An explicit close() of a request message object is also advised, right after the safe content-copy has been made, as a published fair design practice in the published ZeroMQ API strongly recommends.

Never access zmq_msg_t members directly, instead always use the zmq_msg family of functions.

ZeroMQ API is explicitly warning not to attempt to manipulate any message content in any other manner, than using the API-published functions / methods. Better avoid any such tricks, even at a cost of a bit more lengthy code and a few more SLOC-s.

Question:

I'm sending Protobuf encoded data over zwssock (czmq) -- which is an websocket extension for zcmq -- to JSMQ.js after which the decoding of the protobuf data takes place.

  1. OS: Windows
  2. Browser: Chrome 40.0.2
  3. ØMQ: 4.0.4
  4. czmq: 2.2.0.

With the following problem:

the data recieved does not contain all bytes that have been send. In fact after inspection it turns out that all bytes where recieved up to the first 0 byte.

Real example:

data send:

10,4,78,77,69,65,-110,3,19,8,101,-86,6,14,8,1,16,-40,-126,-27,-14,-12,-87,-61,2, -16,1,1,-110,3,93,8,100,80,0,-94,6,86,34,73,36,71,80,82,77,67,44,49,48,51,50,51, 49,46,54,48,49,44,86,44,53,50,48,53,46,54,52,50,54,55,48,44,78,44,48,48,54,49,57 ,46,54,49,51,57,52,55,44,69,44,52,51,46,50,44,51,54,46,52,44,50,51,48,49,49,53,4 4,44,44,83,42,54,68,32,74,9,69,80,83,71,58,52,51,50,54,-30,3,97,10,4,78,77,69,65 ,18,73,36,71,80,82,77,67,44,49,48,51,50,51,49,46,54,48,49,44,86,44,53,50,48,53,4 6,54,52,50,54,55,48,44,78,44,48,48,54,49,57,46,54,49,51,57,52,55,44,69,44,52,51, 46,50,44,51,54,46,52,44,50,51,48,49,49,53,44,44,44,83,42,54,68,32,82,14,10,5,8,2 ,-80,1,2,-94,6,4,8,8,16,6

data recieved:

0, 10, 4, 78, 77, 69, 65, 146, 3, 19, 8, 101, 170, 6, 14, 8, 1, 16, 137, 255, 156, 213, 244, 169, 195, 2, 240, 1, 1, 146, 3, 93, 8, 100, 80]

As you can see the byte after 80 is missing, and the squence now start with a 0 byte. I've tested with manually created data -- char* --, and everytime the same problem would occur.

The two functions below are directly taken from JSQM.js and are called by the websocket upon data recieved.

 function onmessage(ev) {
        if (ev.data instanceof Blob) {
            var arrayBuffer;
            var fileReader = new FileReader();
            fileReader.onload = function () {
                processFrame(this.result);
            };
            fileReader.readAsArrayBuffer(ev.data);
        } else if (ev.data instanceof ArrayBuffer) {
            processFrame(ev.data);
        }
        // Other message type are not supported and will just be dropped
    };

    function processFrame(frame) {
        var view = new Uint8Array(frame);
        var more = view[0];

        if (incomingMessage == null) {
            incomingMessage = new JSMQ.Message();
        }

        incomingMessage.addBuffer(frame);

        // last message
        if (more == 0) {
            if (that.onMessage != null) {
                that.onMessage(that, incomingMessage);
            }

            incomingMessage = null;
        }
    }

In the onmessage/processFrame the received data already does not contain the full bytes sequence. As you can see the received byte sequence starts with a 0, matching the [more == 0] guard.

I was not able to get wireshark to sniff the packages send, checking if the bytes where not correctly send.

One solution would be to use bytestuffing thus removing all 0 bytes. But surely I've must have made a mistake somewhere?

As requested:

Internally we use the c++ library of zeromq, however since the websockets are currently een extention on the c version we need to convert to c style message. As mentioned the data has been stuffed.

void CZeroMQConnection::Send(zmq::message_t& message)
{

    zmsg_t* msg = zmsg_new();

    std::vector<unsigned char> rpl;
    std::copy_n(reinterpret_cast<char*>(message.data()), message.size(),std::back_inserter(rpl));

    // Temporary stuffing on Websockets
    char stuffChar = 1;
    char invalidChar = 0;

    std::vector<unsigned char> stuffed;
    for (auto it = rpl.begin(); it != rpl.end(); ++it)
    {
        if (*it == invalidChar || *it == stuffChar)
        {
            stuffed.push_back(stuffChar);
            stuffed.push_back((*it) + 1);
        }
            else
                stuffed.push_back(*it);
        }

    // As mentioned added extra 0 byte, preventing superfluos data
    stuffed.push_back(0);
    zmsg_push(msg, _id);
    zmsg_addstr(msg, reinterpret_cast<char*> (&stuffed[0]));
    zwssock_send(_socket, &msg);
}

There is is not yet a bi-directional dataflow, this will come in the near future.


Answer:

Thank you somdoron,

your post uses:

zmsg_addmem(msg, buf, 226);

while i was using:

zmsg_addstr(msg, reinterpret_cast<char*> (&stuffed[0]));

which will probably interpret the input as a C string.

This solved the problem, thanx a lot!

Question:

I need to compile my source code for ARM. I should do that with cross compiling, or something like that, but I can't figure out how. I use ZMQ and protobuf libs in my project.

For Ubuntu it is compiled with this command:

c++ program.cpp protofile.pb.cc -o programexe -lzmq -std=c++11 `pkg-config --cflags --libs protobuf`

and everything work fine. So I need to execute this program on my arm board, but first need to compile it for ARM, and this is the step where I stuck. Also, I successfully compiled 'Hello World' with gnueabi and maybe it could help. My OS: Ubuntu 16.04


Answer:

Generally speaking, you can use the cross-compiler to compile the source code on your Ubuntu and then copy the executable program on your board.

Try to install the compiler with the command sudo apt-get install gcc-aarch64-linux-gnu(or aarch32 based on your board's architecture) and then apply the aarch64-linux-gnu-gcc for your source code.