Hot questions for Using ZeroMQ in flatbuffers

Question:

I am trying to send a reasonably big Flatbuffers object over the network via ZMQ and then read it using C++. When accessing the object, I get unhandled exceptions that I don't know how to solve. Even this minimal example fails:

The flatbuffers schema:

namespace flatbuffer;
table TestBuf {
  testStatus:bool;
  testNumber:double;
  testInt:int;
}
root_type TestBuf;

The main.cpp using the REP socket:

int main() {
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_REP);
    socket.bind("tcp://*:5555");

    std::cout << "Listening for requests." << std::endl;
    std::cout << "-----" << std::endl;

    double count = 0;
    while (1) {
        zmq::message_t request;
        socket.recv(&request);

        // Read incoming data
        auto reqmsg = flatbuffer::GetTestBuf(&request);
        std::cout << "Received: " << reqmsg << std::endl;

        flatbuffers::FlatBufferBuilder fbb;
        flatbuffer::TestBufBuilder builder(fbb);

        count++;
        builder.add_testNumber(count);      
        std::cout << "Sending " << count << std::endl;

        auto response = builder.Finish();
        fbb.Finish(response);

        // Send the flatbuffer
        int buffersize = fbb.GetSize();
        zmq::message_t message(buffersize);
        memcpy((void *)message.data(), fbb.GetBufferPointer(), buffersize);
        socket.send(message);
    }
    return 0;
}

The main.cpp using the REQ socket:

int main() {
    // Prepare ZMQ context and socket
    zmq::context_t context(1);
    zmq::socket_t socket(context, ZMQ_REQ);
    std::cout << "Sending out data requests." << std::endl;
    socket.connect("tcp://localhost:5555");

    double count = 0;
    while (1) {
        // Formulate response
        flatbuffers::FlatBufferBuilder fbb;
        flatbuffer::TestBufBuilder builder(fbb);

        count++;
        builder.add_testNumber(count);
        auto response = builder.Finish();
        fbb.Finish(response);

        // Send the flatbuffer
        std::cout << "Sending. " << count << ". ";
        int buffersize = fbb.GetSize();
        zmq::message_t message(buffersize);
        memcpy((void *)message.data(), fbb.GetBufferPointer(), buffersize);
        socket.send(message);
        std::cout << "Sent. ";

        // Receive reply
        zmq::message_t reply;
        socket.recv(&reply);

        // Read the data
        auto inmsg = flatbuffer::GetTestBuf(&reply);
        std::cout << " Received reply: " << inmsg << std::endl;

        //auto num = inmsg->testNumber();
        //std::cout << num << " test number.";
    }
    return 0;
}

This code runs fine and displays (I think) the raw buffer each program is receiving. Strangely, it is not changing, although the content of the message should be. If I uncomment the last two lines and try to access inmsg->testNumber(), I get this error message:

Unhandled exception at 0x000000013F373C53 in KUKAREQ.exe: 0xC0000005: Access violation reading location 0x00000000004B35D8.

I have sent Flatbuffers objects through ZMQ successfully before, but I have not read them in C++. I am fairly sure I followed the Flatbuffers tutorial closely, but something is obviously going wrong. Pointers? Buffer sizes? Either way I would appreciate help.


Edit: To clarify my comment on the accepted answer, the offending line was:

auto inmsg = flatbuffer::GetTestBuf(&reply);

It has to be changed to:

auto inmsg = flatbuffer::GetTestBuf(reply.data());

Whoever reads this question may also be interested to know that I later came across a bug which occurs when the FlatBufferBuilder functions are not called in the correct order. Apparently the order in which the Flatbuffers object is built is important. Finding that one took me a while - novices watch out.


Answer:

Not familiar with ZeroMQ, but flatbuffer::GetTestBuf(&request) this looks problematic.. you need to pass the buffer, not the message structure. Likely request.data() or similar works better.

In general, if it crashes in FlatBuffers, you should use the verifier to verify the buffer you're passing to FlatBuffers. If that fails, it means you're not passing legal data to FlatBuffers, as is the case here.

Also, you may want to check if ZeroMQ can send buffers without copying, will be faster.

Question:

Posting my work for posterity. Realized after finishing my last example in C++ that I actually needed to do it in C all along (awesome, right?). Both iterations took me considerable effort as a Java programmer and I think a lot of the sample code out there leaves far too many holes - especially when it comes to building which is considerably more difficult from the command line for someone who is used to using, say Eclipse, to build a project and handle dependencies.

How to install dependencies for OSX with brew:

brew install flatcc brew install zeromq

You'll need all the standard builder binaries installed as well. I used gcc to compile with:

gcc publisher.c -o bin/zmq_pub -lzmq -lflatcc gcc subscriber.c -o bin/zmq_sub -lzmq

This assumes you've installed the zmq and flatcc libraries which should get symlinked to your /usr/local/include after brew finishes installing them. Like this:

zmq_cpub $ls -la /usr/local/include lrwxr-xr-x 1 user group 37 Oct 18 18:43 flatcc -> ../Cellar/flatcc/0.3.4/include/flatcc

You'll get compilation errors such as: Undefined symbols for architecture x86_64: if you don't have the libraries correctly installed / linked. The compiler / linker will rename functions and prefix them with _ and potentially confuse the hell out of you. Like Undefined symbols for architecture x86_64 _flatcc_builder_init even though there never is supposed to be an _flatcc_builder_init.

That's because linking libraries in C / C++ is fundamentally different than in Java. Instead of a specific project build path that you add JARs too there are known locations where external C / C++ libraries can install to. /usr/local/include, /usr/local/lib, /usr/lib, and /usr/include.

And don't forget to generate the header files to use in your local project after installing the flatcc binary to your path:

flatcc -a Car.fbs

That should be pretty much every obstacle I faced on my trip down C lane. Hope it helps someone out there.


Answer:

Car.fbs

namespace Test;

table Car {
    name: string;
    model: string;
    year: int;
}
root_type Car;

Subscriber.c (listens for incoming structs)

//  Hello World client
#include "flatbuffers/Car_builder.h" // Generated by `flatcc`.
#include "flatbuffers/flatbuffers_common_builder.h"
#include <zmq.h>

#undef ns
#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(Test, x) // Specified in the schema

struct Car {
    char* name;
    char* model;
    int year;
};

int main (void)
{
    printf ("Connecting to car world server...\n");
    void *context = zmq_ctx_new ();
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5555");

    int request_nbr;
    for (request_nbr = 0; request_nbr != 10; request_nbr++) {
        char buffer [1024];
        printf ("Sending ready signal %d...\n", request_nbr);
        zmq_send (requester, "Hello", 5, 0);
        zmq_recv (requester, buffer, 1024, 0);
        printf ("Received car %d\n", request_nbr);
        ns(Car_table_t) car = ns(Car_as_root(buffer));
        int year = ns(Car_year(car));
        flatbuffers_string_t model = ns(Car_model(car));
        flatbuffers_string_t name = ns(Car_name(car));

        struct Car nextCar;
        // no need to double up on memory!!
        // strcpy(nextCar.model, model);
        // strcpy(nextCar.name, name);
        nextCar.model = model;
        nextCar.name = name;
        nextCar.year = year;

        printf("Year: %d\n", nextCar.year);
        printf("Name: %s\n", nextCar.name);
        printf("Model: %s\n", nextCar.model);
    }
    zmq_close (requester);
    zmq_ctx_destroy (context);
    return 0;
}

Publisher.c (sends structs over zmq socket):

//  Hello World server

#include "flatbuffers/Car_builder.h" // Generated by `flatcc`.
#include "flatbuffers/flatbuffers_common_builder.h"
#include <zmq.h>
#include <unistd.h>
#include <time.h>

#undef ns
#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(Test, x) // specified in the schema

struct Car {
    char name[10];
    char model[10];
    int year;
};

struct Car getRandomCar() {
    struct Car randomCar;
    int a = rand();
    if ((a % 2) == 0) {
        strcpy(randomCar.name, "Ford");
        strcpy(randomCar.model, "Focus");
    } else {
        strcpy(randomCar.name, "Dodge");
        strcpy(randomCar.model, "Charger");
    }
    randomCar.year = rand();
    return randomCar;
}

int main (void)
{
    srand(time(NULL));

    //  Socket to talk to clients
    void *context = zmq_ctx_new ();
    void *responder = zmq_socket (context, ZMQ_REP);
    int rc = zmq_bind (responder, "tcp://*:5555");
    assert (rc == 0);
    int counter = 0;

    while (1) {
        struct Car c = getRandomCar();

        flatcc_builder_t builder, *B;
        B = &builder;
        // Initialize the builder object.
        flatcc_builder_init(B);
        uint8_t *buf; // raw buffer used by flatbuffer
        size_t size; // the size of the flatbuffer
        // Convert the char arrays to strings
        flatbuffers_string_ref_t name = flatbuffers_string_create_str(B, c.name);
        flatbuffers_string_ref_t model = flatbuffers_string_create_str(B, c.model);

        ns(Car_start_as_root(B));
        ns(Car_name_add(B, name));
        ns(Car_model_add(B, model));
        ns(Car_year_add(B, c.year));
        ns(Car_end_as_root(B));
        buf = flatcc_builder_finalize_buffer(B, &size);

        char receiveBuffer [10];
        zmq_recv (responder, receiveBuffer, 10, 0);
        printf ("Received ready signal. Sending car %d.\n", counter);
        sleep (1);          //  Do some 'work'
        zmq_send (responder, buf, size, 0);
        counter++;

        free(buf);
    }
    return 0;
}

Question:

I struggle a lot with flatbuffers these days and I could use some help. I want to use flatbuffer to send events, which contain different data, over TCP (Using ZeroMQ). Therefore, I am using a Union.

// event.fbs
namespace event;

table ByteArray {
    bytes:[byte];
}

table OtherData {
    id:uint;
    value:uint;
}

union EventData {
    ByteArray,
    OtherData,
    String:string
}

table Event {
  name:string (key);
  timestamp:ulong = -1;
  data:EventData;
}

root_type Event;

In my C++-class I want to create new events and transfer them to my Publisher-class that sends out the events via ZeroMQ. Is there a nice or common way of doing this? I was thinking about something like this:

mPublisher.publishEvent(event::Event("event1", 0, "dataString"));
mPublisher.publishEvent(event::Event("event2", 1, byteArray));

The example above is not working because there is no such constructor. Is there a nice way of creating multiple events with different data? How am I supposed to pass these flatbuffer-events to another class like my publisher? Should I pass the flatbuffer or the event-offset?


Answer:

You just need to use the actual constructors provided, so it will look something like:

event::CreateEvent(fbb, "event1", 0, fbb.CreateString("dataString"));

There is also an event::CreateByteArray etc. See the generated code, or the tutorial.

event::CreateEvent returns an offset into an unfinished FlatBuffer, so is generally not suitable for passing to non-FlatBuffer functions. You'll want to call fbb.Finish() on that offset, then pass the resulting buffer to other functions (again, see the tutorial).

Question:

I've referenced some examples and I'm modeling my system off of the server and client example and I feel like I'm very close.

StarBuffer.fbs: table StarBuffer { radius: double; mass: double; volume: double; } root_type StarBuffer;

subscriber.cpp:

//    
//  Durable subscriber
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>

#include "zhelpers.hpp"
#include <zmq.hpp>
#include "StarBuffer.h" // generated flat file from StarBuffer.fbs

struct Star {
    double radius ;
    double mass;
    double volume;
};

int main (int argc, char *argv[])
{ 
    zmq::context_t context = zmq::context_t(1);

    //  Connect our subscriber socket
    zmq::socket_t subSocket = zmq::socket_t(context, ZMQ_SUB);
    subSocket.setsockopt(ZMQ_IDENTITY, "Hello", 5);
    subSocket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    subSocket.connect("tcp://localhost:5565");

    //  Get updates, expect random Ctrl-C death
    while (1) {
        zmq::message_t receiveMessage;
        subSocket.recv(&receiveMessage);

        flatbuffers::FlatBufferBuilder fbb;
        StarBufferBuilder builder(fbb);

        auto star = GetStarBuffer(receiveMessage.data());

        std::cout << "Received Star" << std::endl;
        std::cout << "radius: " << star->radius() << std::endl;
        std::cout << "mass: " << star->mass() << std::endl;
        std::cout << "volume: " << star->volume() << std::endl;
    }
    return 0;
}

publisher.cpp:

//
//  Publisher for durable subscriber
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>

#include "zhelpers.hpp"
#include <zmq.hpp>
#include <random>
#include "StarBuffer.h" // generated flat file from StarBuffer.fbs

struct Star {
    double radius ;
    double mass;
    double volume;
};

template<typename Numeric, typename Generator = std::mt19937>
Numeric random(Numeric from, Numeric to)
{
    thread_local static Generator gen(std::random_device{}());

    using dist_type = typename std::conditional
    <
        std::is_integral<Numeric>::value
        , std::uniform_int_distribution<Numeric>
        , std::uniform_real_distribution<Numeric>
    >::type;

    thread_local static dist_type dist;

    return dist(gen, typename dist_type::param_type{from, to});
}


Star getRandomStar() {
    double lower_bound = 0;
    double upper_bound = std::numeric_limits<double>::max();
    double randomRadius = random(lower_bound, upper_bound);
    double randomMass = random(lower_bound, upper_bound);
    double randomVolume = random(lower_bound, upper_bound);

    Star s = Star();
    s.radius = randomRadius;
    s.mass = randomMass;
    s.volume = randomVolume;

   return s;
}

int main () {

    zmq::context_t context = zmq::context_t(1);

    //  We send updates via this socket
    zmq::socket_t publishSocket = zmq::socket_t(context, ZMQ_PUB);
    publishSocket.bind("tcp://*:5565");

    //  Now broadcast exactly 10 updates with pause
    int update_nbr;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {

        Star randomStar = getRandomStar();

        flatbuffers::FlatBufferBuilder fbb;
        StarBufferBuilder builder(fbb);

        builder.add_radius(randomStar.radius);
        builder.add_mass(randomStar.mass);
        builder.add_volume(randomStar.volume);

        auto response = builder.Finish();
        fbb.Finish(response);

        std::cout << "Sending Star " << update_nbr << "…" << std::endl;
        int buffersize = fbb.GetSize();
        zmq::message_t request(buffersize);
        memcpy((void *)request.data(), fbb.GetBufferPointer(), buffersize);
        publishSocket.send(request);
        std::cout << "Star sent!" << std::endl;

        sleep(1);
    }
    return 0;
}

EDIT: The code compiles and works! I've updated the code here to reflect my working code on my local machine for people to use going forward as this took me a good amount of time to finish.

Since it took me a while to figure this stuff out too here's the build process:

flatc --cpp StarBuffer.fbs mv StarBuffer_generated.h StarBuffer.h g++ -std=c++11 publisher.cpp -lzmq -o ./bin/zmq_pub g++ -std=c++11 subscriber.cpp -lzmq -o ./bin/zmq_sub

Requires 'zhelpers.cpp', and 'flatbuffers/flatbuffers.h' in your local directory.


Answer:

Try auto star = GetRootAsStarBuffer(request.data());. The Create function you used is to create a buffer, not to read an existing one.

Unrelated, but on the writing side, you should see if you can create the message_t without having to use memcpy, which would be more efficient.