Hot questions for Using ZeroMQ in ros

Question:

I have a single producer and n workers that I only want to give work to when they're not already processing a unit of work and I'm struggling to find a good zeroMQ pattern.

1) REQ/REP

The producer is the requestor and creates a connection to each worker. It tracks which worker is busy and round-robins to idle workers

Problem:

  • How to be notified of responses and still able to send new work to idle workers without dedicating a thread in the producer to each worker?

2) PUSH/PULL

Producer pushes into one socket that all workers feed off, and workers push into another socket that the producer listens to.

Problem:

  • Has no concept of worker idleness, i.e. work gets stuck behind long units of work

3) PUB/SUB

Non-starter, since there is no way to make sure work doesn't get lost

4) Reverse REQ/REP

Each worker is the REQ end and requests work from the producer and then sends another request when it completes the work

Problem:

  • Producer has to block on a request for work until there is work (since each recv has to be paired with a send ). This prevents workers to respond with work completion
  • Could be fixed with a separate completion channel, but the producer still needs some polling mechanism to detect new work and stay on the same thread.

5) PAIR per worker

Each worker has its own PAIR connection allowing independent sending of work and receipt of results

Problem:

  • Same problem as REQ/REP with requiring a thread per worker

As much as zeroMQ is non-blocking/async under the hood, I cannot find a pattern that allows my code to be asynchronous as well, rather than blocking in many many dedicated threads or polling spin-loops in fewer. Is this just not a good use case for zeroMQ?


Answer:

Your problem is solved with the Load Balancing Pattern in the ZMQ Guide. It's all about flow control whilst also being able to send and receive messages. The producer will only send work requests to idle workers, whilst the workers are able to send and receive other messages at all times, e.g. abort, shutdown, etc.

Question:

I am using the PUB/SUB model in ZeroMQ inside ROS.

The SUB-subscriber is allowed to stop just by pressing a Ctrl+C in the terminal.

However, every time, when I actually press the Ctrl+C, it stops by throwing the following error:

^Cterminate called after throwing an instance of 'zmq::error_t'
  what():  Interrupted system call
Aborted (core dumped)

Below is the code snippet:

#include <ros/ros.h>
#include <zmq.hpp>

int main(int argc, char **argv)
{
  ros::init(argc, argv, "node_name", ros::init_options::AnonymousName);
  ros::NodeHandle nh;
  ros::Publisher pub;

  //  Prepare context and publisher
  zmq::context_t zmq_context(1);
  zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB);
  zmq_socket.connect("tcp://192.168.1.20:9001");

  std::string TOPIC = "";
  zmq_socket.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length()); // allow all messages

  int timeout = 1000;  // Timeout to get out of the while loop since recv is blocking
  zmq_socket.setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(timeout));

  while (ros::ok())
  {
      zmq::message_t msg;
      int rc = zmq_socket.recv(&msg);
      if (rc)
      {
          //receive data and prepare it for publishing
          pub.publish(data);
          ros::spinOnce();
      }
  }

  // Clean up the socket and context here
  zmq_socket.close();
  zmq_context.close();

  return 0;
}

How to avoid the error so as to shut down the subscriber properly?


Answer:

Without details about how the Ctrl+C was being trapped and handled, I would always add ( C++ binding details may differ version to version ):

int main(int argc, char **argv)
{
    zmq_socket.connect(    "tcp://192.168.1.20:9001" );
    zmq_socket.setsockopt( ZMQ_LINGER, 0 );              // ALWAYS
    ...

    while( ROS::ok() )
    {
       ...
    }
    std::cout << "SIG:: will .close() after ROS::ok()-loop exit" << std::flush;
    zmq_socket.close();

    std::cout << "SIG:: will .term()  after a Socket()-instance .close()'d" << std::flush;
    zmq_context.close();

    std::cout << "SIG:: will return 0 after a Context()-instance .term()'d" << std::flush;
    return 0;
}

Question:

I am a newbie in ZeroMQ.

I use ROS frequently. Hence I am confused by the subscriber in ZeroMQ. Most of the time in ROS, the subscriber is having a callback function, which is called automatically whenever data is available in the corresponding rostopic.

Please see below code snippet, borrowed from ROS wiki:

void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
  ROS_INFO("I heard: [%s]", msg->data.c_str());
}
//define subscriber and callback function associated with it
ros::Subscriber sub = n.subscribe("chatter", 1000, chatterCallback);

However, in ZeroMQ, it seems that subscriber is kept in a loop to receive the data, as shown below:

for (int update_nbr = 0; update_nbr < 100; update_nbr++)
{
    zmq::message_t update;
    int zipcode, temperature, relhumidity;
    // receive the data
    subscriber.recv(&update);
    // do something with data
    std::istringstream iss(static_cast<char*>(update.data()));
    iss >> zipcode >> temperature >> relhumidity;
}

The above code is borrowed from ZeroMQ wiki.

Does there exist any callback-mechanism, similar to ROS Subscriber, in ZeroMQ too?


Answer:

No, there is no callback system in ZMQ. You have to call recv() function in order to receive a message.

You can implement one using recv() since it blocks and returns a status, so you can use it in an if condition and a while loop.

I often use a pattern like this one with a timeout :

zmq::context_t zmq_context(1);
zmq::socket_t zmq_socket(zmq_context, ZMQ_SUB);

zmq_socket.connect("tcp://127.0.0.1:58951");

std::string TOPIC = "";

zmq_socket.setsockopt(ZMQ_SUBSCRIBE, TOPIC.c_str(), TOPIC.length()); // allow all messages
zmq_socket.setsockopt(ZMQ_RCVTIMEO, 1000); // Timeout to get out of the while loop since recv is blocking
while(run) {
    zmq::message_t msg;
    int rc = zmq_socket.recv(&msg);  // Is blocking until you receive a message
    if(rc) { 
        // You received a message : your data is in msg
        // Do stuff here : call your function with the parameters received from zmq
    }
}
// Clean up your socket and context here
zmq_socket.setsockopt(ZMQ_LINGER, linger);
zmq_socket.close();
zmq_context.close();

Question:

I have three applications that are communicating via ZeroMQ all performing different operations. The different applications are as follows:

  1. The first is a C++ application which is an engine for "hard work", this takes a Protobuf message as a request from a client, does some work, and returns a Protobuf message back to that client (or whoever is connected, Request/Reply Pattern). This uses 0MQ version 4.0.4 and using protobuf-2.6.0 where we have built the required header files ourselves, the Protobuf classes were created by protoc.

  2. Second I have a Java code and is a data provider, this uses jeromq-0.3.4.jar for the ZeroMQ messaging and protobuf-java-2.6.1.jar for the Protobuf serialization etc.

  3. Third I have a C# code which performs some analysis and has a nice UI etc. This uses Marc Gravell's protobuf-net (https://www.nuget.org/packages/protobuf-net/) as a NuGet package and NetMQ (native C# port of ZeroMQ) for the messaging.

Now, C++ <-> Java works great and with out problems, however, C++ <-> C# does not work correctly. When I send a basic request from C# to the C++ "server" via

using (NetMQContext context = NetMQContext.Create())
using (var requestSocket = context.CreateRequestSocket())
{
    requestSocket.Connect(_requestAddress); // "tcp://127.0.0.1:6500"
    requestSocket.Send(mux.ToByteArray<Taurus.FeedMux>());
}

with

public static byte[] ToByteArray<T>(this T o) 
    where T : ProtoBuf.IExtensible
{
    if (o == null)
        return null;
    using (MemoryStream ms = new MemoryStream())
    {
        ProtoBuf.Serializer.Serialize(ms, o);
        return ms.ToArray();
    }
}

The C++ code receives the message but despite setting a mandatory

Taurus.FeedMux mux = new Taurus.FeedMux();
mux.type = Taurus.FeedMux.Type.OXX;
mux.oxx = Oxx.GetOxx();

I get an error in the C++ application

[libprotobuff ERROR ..\\message_lite.cc:123] Can't parse message of type "Taurus.FeedMux" because it is missing required fields: type

But I am clearly setting type and in the C++ code, type seems to be set (using the debugger to inspect the deserialized object). I have tried two different Protobuf libraries (one I built and Mark Gravell's library via NuGet) as I thought this was a serialization issue , but this does not fix this problem.

I have also tried the clrZMQ wrapper library as well as the C# native NetMQ library, again this does not help, message is received using any combination of the above, but the received seems corrupt in some way.

What could be going wrong here and is there anything I should be doing that I have not mentioned?


Answer:

My guess is that type is being treated as optional, where-as in the C++ it is marked as required. I can't see how your model is defined in the C#, but if you're using protobuf-net's attributes, you can force it to serialize via the IsRequired attribute member:

[ProtoMember(42, IsRequired = true)] // change the number!
public Taurus.FeedMux.Type type {get;set;}

Question:

Fairly new to ROS, but haven't been able to find this information searching around.

We're building an instrument where we need to transfer large data streams over the network on a 100Mbit limited cable. Preferably we need to transfer RAW images (~10MB a piece) or we can do some lossless compression resulting in about 5MB a piece.

Is this perfectly fine for ROS with native image topics, or would a separate protocol such as ZeroMQ be better for this task? What are the limitations in ROS on large data streams?

Hope that someone with the knowledge could take a second to share some experience.

Thanks!


Answer:

10E6 [B] over a private, 100% free 100E6 [b/s] channel takes no less ~0.8 [s] _5E6 [B] over a private, 100% free 100E6 [b/s] channel takes no less ~0.4 [s]

Q : What are the limitations in <something> on large data streams?

Here we always fight a three-fold Devil mix of:

Power( for data processing, a 10[MB]->5[MB] compression + RAM-I/O not being excluded ) + Time( latency + jitter of the E2E data-transport across the series of transport-channels ) + Errors( uncertainties of content delivery, completeness and authenticity over the E2E data-transport path )

In the ROS domain, being a system for coordinated control-loops' constrained sub-systems, there is one more problem - not meeting the "in-time-ness" causes the control to fail into principally unstable territory.

Countless examples of what happens when this border has been crossed - from production line falling into panic, resulting in an immediate emergency total-stop state, to damaged tools, products, equipment and continued colliding, still crashing during still continued operations ( when collision-detection and emergency total-stops were not implemented safe ).


Q : would a separate protocol such as ZeroMQ be better for this task?

ZeroMQ has excellent performance ( does not add much on the Time leg of the Devil-mix, yet it always depends on having (in-)sufficient resources ( the Power to smoothly process )

ZeroMQ has excellent performance scalability, sure, if the Power leg of the Devil-mix permits.

ZeroMQ has excellent properties for the Errors leg of the Devil-mix - we get warranty of Zero-Errors - it either delivers the message (the payload) as a bit-to-bit identical copy of the original content, or nothing at all. This warranty may look strange, a sure overkill for blurred or noisy images - yet, it is a fair strategy for not having additional Power and Time-uncertainty issues due to error-detection/limited-recovery. Yet it leaves us free with choices, how to handle (if needed), within a given, constrained, Time- and Power-domains, the main duty - the ROS control-loops' stability - with missing or re-transmit requested payloads, given Errors were indirectly detected from time-stamping or monotonic-ordinal indexing et al.

ROS Topics, on the contrary, are limited to a single PUB/SUB formal communication-pattern archetype only and fixed to use either a TCPROS transport-class ( ZeroMQ may use a faster, L3+protocol stack-less { inproc:// | ipc:// } or, if needed, even extended to a mil-std guaranteed-delivery or a distributed grid-computing tipc:// or hypervisor-orchestrated vmci:// transports ) or UDPROS, which is currently available in roscpp only and lossy, but having lower latency, compared to TCPROS.

Question:

How to guarantee message delivery in a non-transacted, lightweight environment?

For example:

  • Normal situation: Write to database, commit, send message to ZeroMQ|Redis|OtherMQ, consumer pulls the message to continue processing...
  • 0,05% situation: Write to database, commit, application dies!, no message sent, no consumer pull the message, incomplete processing.

How to not loose the message (avoid not send the message) in this situation?

Edit: The message must be delivery exactly once.


Answer:

In this scenario you have 2 shared resources (database and queue) and you want them to be transacted together. You want your database to commit if the message sent to the queue. You want your database not to commit if it is not successfully sent and vice versa. This is simply global transaction mechanism like 2PC. However to implement a global transaction mechanism is not that easy and it is also very costly.

I suggest you to implement at least one strategy on producer side and idempotency on consumer side to provide consistency.

You should create a message table on producer side's database and persist messages to this table before sending to queue. Then with a scheduled thread (here there may have multiple threads to increase throughput but be careful if your messages needs to be consumed in the order they produced) or any other thing you can send them to queue and mark them as sent to ensure that the messages which are already sent will not be sent again. Even if you do that there might be some cases that your messages are sent more than once (e.g. you send a message to queue and your application crashed before marking the message as sent). But it is not a problem, because we already want to implement at least once strategy on producer side which means we want a message to be sent to queue at least once.

To prevent a consumer to consume same messages which are produced more than once on producer side you should implement idempotent consumers. Simply, you can save id of consumed messages to a database table on consumer side and before processing messages coming from the queue, you may check if it is already consumed. If it is already consumed you should ignore it and get the next message.

There are of course other options to provide consistency in microservices environment. You can find other solutions on this great blog - https://www.nginx.com/blog/event-driven-data-management-microservices/. The solution i explained above also exists in this blog. You can find it in Publishing Events Using Local Transactions section.

Question:

I'm trying to install ZeroMQ into an electron application but I'm unable to rebuild the package for electron.

I'm trying to install ZeroMQ for electron 5.1.0 on my Windows 10 machine. Details:

OS: Windows 10 Electron: 5.0.8 node: 12.0.0

As per the instructions in the README.md https://github.com/zeromq/zeromq.js/, I have followed this procedure:

1) Install zeromq with npm npm install zeromq

2) Rebuild for electron npm rebuild zeromq --runtime=electron --target=5.1.0 Notes: The documentation indicates that the target here is supposed to be the Electron version; however, this causes a 404 error as npm tries to go to https://nodejs.org/dist/v5.0.8/node-v5.0.8-headers.tar.gz which doesn't exist. I then thought to try the node version, but that leads to a 401 error. I then tried the zeromq version (5.1.0 as shown in command) which leads to this error:

error MSB4019: The imported project "C:\Microsoft.Cpp.Default.props" was not found. Confirm that the path in the <Import> declaration is correct, and that the file exists on disk.

Googling this error brings up a lot of StackOverflow problems where the build tools are required. Looking a little ahead on the README.md led me to thing I could solve this problem using the next command.

3) Install visual studio build tools npm install --global --production windows-build-tools This command is completed successfully and installs both the build tools and python2.7

4) Rerunning the rebuild command in step 2 leads to the same error. I tried variations of this command: npm config set msvs_version 2013 using 2015 and 2017 as well.

2013 and 2015 give this error: error MSB4019: The imported project "C:\Microsoft.Cpp.Default.props" was not found. Confirm that the path in the <Import> declaration is correct, and that the file exists on disk. gyp ERR! build error gyp ERR! stack Error: C:\Program Files (x86)\MSBuild\14.0\bin\msbuild.exe failed with exit code: 1 gyp ERR! stack at ChildProcess.onExit (C:\Program Files\nodejs\node_modules\npm\node_modules\node-gyp\lib\build.js:262:23) gyp ERR! stack at ChildProcess.emit (events.js:198:13) gyp ERR! stack at Process.ChildProcess._handle.onexit (internal/child_process.js:248:12) gyp ERR! System Windows_NT 10.0.17134 gyp ERR! command "C:\\Program Files\\nodejs\\node.exe" "C:\\Program Files\\nodejs\\node_modules\\npm\\node_modules\\node-gyp\\bin\\node-gyp.js" "rebuild" gyp ERR! cwd C:\Users\<user>\path\to\app\node_modules\zeromq gyp ERR! node -v v10.16.0 gyp ERR! node-gyp -v v3.8.0 gyp ERR! not ok npm ERR! code ELIFECYCLE npm ERR! errno 1 npm ERR! zeromq@5.1.0 install: node scripts/prebuild-install.js || (node scripts/preinstall.js && node-gyp rebuild) npm ERR! Exit status 1 npm ERR! npm ERR! Failed at the zeromq@5.1.0 install script. npm ERR! This is probably not a problem with npm. There is likely additional logging output above.

While 2017 gives this: error MSB8020: The build tools for v140 (Platform Toolset = 'v140') cannot be found. To build using the v140 bui ld tools, please install v140 build tools. Alternatively, you may upgrade to the current Visual Studio tools by select ing the Project menu or right-click the solution, and then selecting "Retarget solution". [C:\Users\<user>\path\to\app\node_modules\zeromq\build\zmq.vcxproj] with a similar ending section as the other years produced.


Answer:

In the end this was a visual studio build tools install error. I uninstalled visual studio using the visual studio installer and then reinstalled windows-build-tools.