Hot questions for Using ZeroMQ in serialization

Top 10 C/C++ Open Source / ZeroMQ / serialization

Question:

I am trying to use ZeroMQ for multiprocessing. I want to stream files from a tar file so I used the streamer. Below is an instance of what want to do.

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process

def server(frontend_port, number_of_workers):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://127.0.0.1:%d" % frontend_port)

    for i in range(0,10):
        socket.send_json('#%s' % i)
    for i in range(number_of_workers):
        socket.send_json('STOP')   
    return True

def worker(work_num, backend_port):
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:%d" % backend_port)

    while True:
        message = socket.recv_json()
        if message == 'STOP':
            break
        print("Worker #%s got message! %s" % (work_num, message))
        time.sleep(1)

def main():
    frontend_port = 7559
    backend_port = 7560
    number_of_workers = 2

    streamerdevice  = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
    streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
    streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
    streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL')
    streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH')
    streamerdevice.start()
    processes = []
    for work_num in range(number_of_workers):
        w = Process(target=worker, args=(work_num,backend_port))
        processes.append(w)
        w.start()
    time.sleep(1)
    s = Process(target=server, args=(frontend_port,number_of_workers))
    s.start()
#     server(frontend_port)
    s.join()
    for w in processes:
        w.join()

if __name__ == '__main__':
    main()

This code works properly. But I want to use send_multipart() to send a tuple or a list that includes items with different types like [string, numpy_array, integer] but json can't handle numpy arrays. I am avoiding using pickle because I need it to be as fast as possible. I tried to convert the array to bytes too but it didn't work. (maybe I was doing it wrong I am not sure). I appreciate if you can provide a working snippet of code. Ideally, I want to do something like this:

socket.send_multipart([string, numpy_array, integer])

So I want to know what is the most efficient way of doing it.

I am using Python 3.6


Answer:

msgpack and msgpack_numpy are the best option I could find. Try this:

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
import numpy as np
import msgpack
import msgpack_numpy as m

def server(frontend_port, number_of_workers):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://127.0.0.1:%d" % frontend_port)

    for i in range(0,10):
        arr = np.array([[[i,i],[i,i]],[[i,i],[i,i]]])
        file_name = 'image file name or any other srting'
        number = 10 # just an instance of an integer
        msg = msgpack.packb((arr, number, file_name), default=m.encode, use_bin_type=True)  
        socket.send(msg, copy=False)
        time.sleep(1)

    for i in range(number_of_workers):
        msg = msgpack.packb((b'STOP', b'STOP'), default=m.encode, use_bin_type=True)
        socket.send(msg, copy=False)   
    return True

def worker(work_num, backend_port):
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:%d" % backend_port)

    while True:
        task = socket.recv()
        task = msgpack.unpackb(task, object_hook= m.decode, use_list=False,  max_bin_len=50000000, raw=False)
        if task[1] == b'STOP':
            break
        (arr, number, file_name) = task
        print("Worker ",work_num,  'got message!', file_name)
    return True

def main():
    m.patch()
    frontend_port = 3559
    backend_port = 3560
    number_of_workers = 2

    streamerdevice  = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
    streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port )
    streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
    streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL')
    streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH')
    streamerdevice.start()
    processes = []
    for work_num in range(number_of_workers):
        w = Process(target=worker, args=(work_num,backend_port))
        processes.append(w)
        w.start()
    time.sleep(1)
    s = Process(target=server, args=(frontend_port,number_of_workers))
    s.start()
    s.join()
    for w in processes:
        w.join()

if __name__ == '__main__':
    main()

Question:

I am trying to set up a basic communication system between client and server using ZMQ. I am using protobuf for the message format. My problem is when I send the message from client the message size is 34 but the message size received on the server is 0.

Following is my code;

Client.cpp

tutorial::Person person;
person.set_id(1234);
person.set_name("john");
person.set_email("john@mxyz.com");
person.set_phonenumber("12345678");


zmq::context_t context (1); //  Prepare our context and socket
zmq::socket_t socket (context, ZMQ_PAIR);

std::cout << "Connecting to server…" << std::endl;
int linger = 0;// ms
socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
socket.connect ("tcp://127.0.0.1:20000");

std::string msg_str;
person.SerializeToString(&msg_str);
std::cout << "Size of message string is "<< msg_str.size()<<std::endl;
zmq::message_t request (msg_str.size());
memcpy ((void *) request.data (), msg_str.c_str(), msg_str.size());
std::cout << "Sending Person data ..." << std::endl;
socket.send (request);

socket.close();

google::protobuf::ShutdownProtobufLibrary();
return 0;

Server.cpp :

zmq::context_t context(1); //  Prepare our context and socket
zmq::socket_t socket(context, ZMQ_PAIR);
int linger = 0; // ms
socket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
socket.bind("tcp://127.0.0.1:20000");
while (true)
 {
    zmq::message_t request;
    int recieved = socket.recv(&request);
    std::string msg(static_cast<char*>(request.data()),request.size());
    std::cout<<"Size of message recieved is "<<  msg.size()<<std::endl;
    tutorial::Person person;
    person.ParseFromString(msg);
    std::string text_str1;
    google::protobuf::TextFormat::PrintToString(person, &text_str1);
}
socket.close();

Output of Client is :

Size of message string is 34

Output of server is :

Size of message received is 0

I have tried tried switching to ParseToArray also but it did not help. Any kind of help is appreciated.


Answer:

I have solved it with another way. Actually earlier I installed libzmq lib , but later when I switched to libzmq3-dev it worked for me.

Question:

To start off, it is worth mentioning that within a single F# solution, serializing and deserializing of Bond messages works fine. However, I am having trouble properly handling the sending and/or receiving of the message over ZeroMQ.

There is a runtime error on the subscriber side of the following program. A .bond file is defined and compiled with the bond compiler. Then a dll is created from C# to be called from F# . I then have two F# programs. One that publishes serialized data over a tcp socket and another that is a subscriber. When the message is received on the sub, the line which tries to Unmarshal the raw data is the one that causes the runtime error. Can anyone see the reason for this?

[EDIT] Per Fyodor's comment, I made a change on the publisher side which changes the error on the subscriber side. So the error likely has something to do with how i am packing and unpacking the information.

This is the .bond file

namespace Examples

struct Record
{
    0: map<string, double> payload;
}

Here is the publisher:

// publisher

open System
open Bond
open Bond.Protocols
open Bond.IO.Safe
open ZeroMQ

let ctx = new ZContext()
let publisher = new ZSocket(ctx, ZSocketType.PUB)
publisher.Bind("tcp://*:5556")

let src = new Examples.Record()
src.payload.Add("a", 1.)
src.payload.Add("b", 2.)

let output = new OutputBuffer()
let writer = new CompactBinaryWriter<OutputBuffer>(output)

while true do
    Marshal.To(writer, src)
    //let input = new InputBuffer(output.Data)
    //let byteArr = input.ReadBytes(int(input.Length - 1L))
    let updateFrame = new ZFrame(System.Text.Encoding.ASCII.GetString output.Data.Array)
    publisher.Send(updateFrame)

Here is the subscriber:

// subscriber

open Bond
open Bond.Protocols
open Bond.IO.Safe
open System
open System.Text
open ZeroMQ

let ctx = new ZContext()
let subscriber = new ZSocket(ctx, ZSocketType.SUB)
subscriber.Connect("tcp://127.0.0.1:5556")
subscriber.SubscribeAll()

let output = new OutputBuffer()    
while true do    
    let received = subscriber.ReceiveFrame()
    let byteArr = Encoding.ASCII.GetBytes (received.ReadString())
    let arrSeg = ArraySegment<byte>(byteArr)
    let input = new InputBuffer(arrSeg)
    let dst = Unmarshal<Examples.Record>.From(input)
    for KeyValue(k, v) in dst.payload do
        printfn "%A %A" k v

Answer:

On the receiving side, when you attempt to decode the marshaled Bond Compact Binary as an ASCII string, you're losing some of the payload. When marshaling a struct like Record to Compact Binary, the first four bytes of the payload are 0x43 0x42 0x10 0x00. When reading a string from a ZFrame, the first embedded NUL (0x00) that is encountered signals the end of the string, regardless of the size of the frame. So, the reading side sees only 0x43 0x42 0x10 instead of the whole payload (29 bytes when I tested).

Since Compact Binary is a binary protocol, you'll want to use the ZFrame constructor that takes a buffer on the publisher side:

let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)

On the subscriber side, you'll want to just read the buffer:

let byteArr = received.Read()

Also, on the publisher side, you're constantly accumulating data into the same OutputBuffer. You'll want to reset output.Position to 0 before you marshall your next record to re-use the buffer instead of growing it:

while true do  
    Marshal.To(writer, src)
    let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)output.Data.Array)
    publisher.Send(updateFrame)
    output.Position <- 0

Another thing to note: the default buffer allocated for an OutputBuffer is 65KiB. Consider making this smaller, once you know about how large your payloads are going to be.

NB: I debugged this in a C# application that had similar semantics. Here's what I used:

namespace so_q_zmq
{
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Bond;
    using Bond.IO.Safe;
    using Bond.Protocols;
    using ZeroMQ;

    [Schema]
    class Record
    {
        [Id(0)]
        public Dictionary<string, double> payload = new Dictionary<string, double>();
    }

    class Program
    {
        static void Main(string[] args)
        {
            var pTask = Task.Run(() =>
            {
                try
                {
                    Publisher();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Publisher failed: {0}", ex);
                }
            });

            var sTask = Task.Run(() =>
            {
                try
                {
                    Subscriber();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Subscriber failed: {0}", ex);
                }
            });

            Task.WaitAll(pTask, sTask);
            Console.WriteLine("Done");
            Console.ReadLine();
        }

        static void Publisher()
        {
            var ctx = new ZContext();
            var publisher = new ZSocket(ctx, ZSocketType.PUB);
            publisher.Bind("tcp://127.0.0.1:12345");

            var src = new Record();
            src.payload.Add("a", 1.0);
            src.payload.Add("b", 2.0);

            var output = new OutputBuffer();
            var writer = new CompactBinaryWriter<OutputBuffer>(output);

            for (;;)
            {
                Marshal.To(writer, src);
                // INCORRECT:
                // var str = Encoding.ASCII.GetString(output.Data.Array);
                // var updateFrame = new ZFrame(str);
                var updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count);
                publisher.Send(updateFrame);
                output.Position = 0;
            }
        }

        static void Subscriber()
        {
            var ctx = new ZContext();
            var subscriber = new ZSocket(ctx, ZSocketType.SUB);
            subscriber.Connect("tcp://127.0.0.1:12345");
            subscriber.SubscribeAll();

            for (;;)
            {
                var received = subscriber.ReceiveFrame();
                // INCORRECT
                // var str = received.ReadString();
                // var byteArr = Encoding.ASCII.GetBytes(str);
                var byteArr = received.Read();
                var arrSeg = new ArraySegment<byte>(byteArr); // There's an InputBuffer ctor that takes a byte[] directly
                var input = new InputBuffer(arrSeg);
                var dst = Unmarshal<Record>.From(input);
                foreach (var kvp in dst.payload)
                {
                    Console.WriteLine("{0} {1}", kvp.Key, kvp.Value);
                }
            }
        }
    }
}

Question:

UPDATING MY QUESTION

How to can I represent the arrived message in my python zmq server to show their content ?

According to this behavior, can I assume that the btnState data is sent to python server in anyway?

Context:

I am sending some data members structures using a C++ zeromq client process: ZMQComponent.h file

#include <zmq.hpp>
#include <sofa/defaulttype/VecTypes.h>

// To Quat datatype
#include <sofa/defaulttype/Quat.h>
using sofa::defaulttype::Quat;

using std::string;

namespace sofa
{

namespace component
{

namespace controller
{

/* data structure which I want send data to python zmq server */
struct instrumentData
{
  typedef sofa::defaulttype::Vec3d Vec3d;
  Vec3d pos;
  Quat quat;
  int btnState;
  float openInst;
  bool blnDataReady;
};

class ZMQComponent : public sofa::core::behavior::BaseController
{
  public:
    SOFA_CLASS(ZMQComponent, sofa::core::behavior::BaseController);

    ZMQComponent();
    virtual ~ZMQComponent();

    /* Conect to ZMQ external python Server  */
    void setupConnection();

    /* Send some data memeber instrumentData structure to ZMQ external Server  */
    void instrumentDataSend(instrumentData a);

    /* initialize function */
    void init();

};

} // namespace sofa

} // namespace component

} // namespace controller

The ZMQComponent.cpp is:

#include <sofa/core/ObjectFactory.h>
#include <zmq.hpp>
#include <iostream>
#include <string>
#include "ZMQComponent.h"


using namespace std;

namespace sofa
{

namespace component
{

namespace controller
{

/*  ZMQ Internal Client context and socket */
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REQ);

ZMQComponent::ZMQComponent(){}

void ZMQComponent::setupConnection()
{
    cout << "Connecting to python zeroMQ server ..." << endl;
    socket.connect("tcp://localhost:5555");
}

void ZMQComponent::instrumentDataSend(instrumentData a)
{
    /*  Initialize the data members structure instrumentData */
    a.pos = sofa::defaulttype::Vec3d(1.0f, 1.0f, 1.0f);
    a.quat = defaulttype::Quat(1.0f, 1.0f, 4.0f, 1.0f);
    a.btnState = 5671;
    a.openInst = 1.0f;
    a.blnDataReady = false;

    string s, test, result, d;
    s = to_string(a.btnState);
    test = " is a number";
    result = s + test;

    /*  We send  the btnState data  */
    zmq::message_t request(30);



/*  We ask for the memory address to ge the btnState content and send it. */
    memcpy(request.data(), &result, 30);
    socket.send(request);
}


/*  In the init function we create the objects to setup connection and send data  */
void ZMQComponent::init()
{
    std::cout << "ZeroMQCommunication::init()" << std::endl;
    ZMQComponent z;
    z.setupConnection();

    instrumentData itemp;
    z.instrumentDataSend(itemp);

}

/*  Other code related ....  */
ZMQComponent::~ZMQComponent(){}

// int ZeroMqComponentClass = sofa::core::RegisterObject("This component does nothing.").add<ZeroMqComponent>();
SOFA_DECL_CLASS(ZMQServerComponent)

int ZMQServerComponentClass = sofa::core::RegisterObject("This component create a Socket.").add< ZMQServerComponent >();
} // namespace controller

} // namespace component

} // namespace sofa

Then , my python zmq server which receive the btnState int variable is:

import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
print('ZMQ Server listening ... ')

while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received message from Sofa: {}".format(message))

    #  Do some 'work'
    time.sleep(1)

The output or the message which arrive to python zmq server is the content of result variable (btnState turn to string in s content variable + string test concatenated) and some symbols characters of the :

(cnvss_test) ➜  Python git:(ZMQCommunication) ✗ python server.py
ZMQ Server listening ...
Received message from Sofa: b'\xb0\x1d\x19\xf4\xfd\x7f\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x0045 is a number'

The previous output on my ZMQ python server script show that the string result from sofa is arrived to server, and their content is visualized, but too, this strings or characters symbols, which are product or consequence of the size of the zmq::message_t request(30) defined in my C++ client in the beginning.

If I assign a value less than 30 in the request, by example zmq::message_t request(10) the output in my server is:

Received message from Sofa: b'\x90\x94\xa1\x00\xfc\x7f\x00\x00\x0e\x00'

If I assign a value greater than 10 in the request, by example zmq::message_t request(20) the output in my server is:

Received message from Sofa: b'\x80$(\xc7\xfc\x7f\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x0045 i

Then, the string or object which I receive in the server side, it has as long as the length or size assigned to zmq::message_t request variable

Based in the above mentioned, is ZMQ whom add this strings in my message received?

According to the previous process, my message is arrived to my server, then is correct attempt what the serialization process with some entity like protocol buffer is necessary? I understand that use something like google protocol buffer allow have some correlation more controlled in relation to objects sent and the objects received in relation of their real content ...

In any case, how to can I remove the strings or characters symbols that are added in the message arrived to the server?

Any support or orientation will be highly appreciated


Answer:

Your system is heterogeneous, meaning you need some sort of serialisation that is platform / language agnostic.

For your purposes the most convenient one to use is likely to be Google Protocol Buffers. This supports both C++ and Python very nicely. With this you'll be defining your messages / data structures in a schema file (file extension .proto), and using protoc to compile that to both C++ source code and also to Python source code. These give you classes that can serialise/deserialise to/from the same wireformat. The serialisation / deserialisation can nicely integrated with ZMQ message buffers.

There are others;

  • Apache Avro is a possibility.
  • I would avoid XSD schemas; in principle they're fine, but finding code generators that actually do a proper and complete job is difficult / expensive. For example, xsd.exe (from Microsoft) can compile an XSD schema to C++ classes (I think), but ignores the constraint fields in the schema (e.d. MinInclusive).
  • ASN1 is really good, but I've yet to find a decent implementation for Python. There is a code-first implementation of ASN.1 for python (pyasn), but that rather misses the whole point of having an ASN.1 schema...

Question:

I'm developing zmq/protobuf application and I have a problem with deserialization of messages sent from C++ to python. I easily handle messages from python to C++ however in the other direction I have a problem.

Protobuf library in python client application complains that it detected: File "C:\Python27\lib\site-packages\google\protobuf\internal\python_message.py", line 844, in MergeFromString raise message_mod.DecodeError('Unexpected end-group tag.')

I presume there is a problem between C++ serizalization and python deserialization. I'm wondering if there is some problem with null terminator in C/C++.

I'm using RaspberryPi running Raspian for C++ code and x64 CPU running Windows 7 for python code.

This is my C++ serialization code..

// Test Code.
// Try to send some 'demo' response back.
RPiProtocol::Message response;
std::string response_string;
response.set_type(RPiProtocol::Message::RESPONSE);
response.set_command(RPiProtocol::Message::GET_SYS_INFO);
response.set_version(0);

// Serialize ZMQ message to string.
if (response.SerializeToString(&response_string))
{
    //  Send response message back to the client.
    zmq::message_t reply(response_string.length());
    memcpy((void *)reply.data(), (void *)&response_string, response_string.length());
    socket.send(reply);
}

This is my python deserialization code..

#  Get the reply.
message = socket.recv()
response = rpi_protocol_pb2.Message()

# This line fails.
response.ParseFromString(str(message))

I debugged that deserialization fails in this function \google\protobuf\internal\python_message.py

  def InternalParse(self, buffer, pos, end):
    self._Modified()
    field_dict = self._fields
    unknown_field_list = self._unknown_fields
    while pos != end:
      (tag_bytes, new_pos) = local_ReadTag(buffer, pos)
      field_decoder, field_desc = decoders_by_tag.get(tag_bytes, (None, None))
      if field_decoder is None:
        value_start_pos = new_pos
        new_pos = local_SkipField(buffer, new_pos, end, tag_bytes)
        if new_pos == -1: # HERE I HAVE -1 !!!
          return pos
        if not unknown_field_list:
          unknown_field_list = self._unknown_fields = []
        unknown_field_list.append((tag_bytes, buffer[value_start_pos:new_pos]))
        pos = new_pos
      else:
        pos = field_decoder(buffer, new_pos, end, self, field_dict)
        if field_desc:
          self._UpdateOneofState(field_desc)
    return pos
  cls._InternalParse = InternalParse

Could you help me with enabling my application?


Answer:

I too had problems deserializing protobuff data in python. The protobuff data had been serialized using the 'C' protobuff code.

The short answer is look at using binascii.unhexlify().

Serializing a protobuff on python and sending to the 'C' code where it was deserialized worked just fine. But the opposite was not true until I did the following: binstring = binascii.unhexlify(hexstring). Then protoBuff.ParseFromString(binstring) worked just fine.