Hot questions for Using ZeroMQ in performance

Question:

I'm implementing a performance heavy two-party protocol in C++14 utilising multithreading and am currently using ZeroMQ as a network layer.

The application has the following simple architecture:

  • One main server-role,
  • One main client-role,
  • Both server and client spawn a fixed number n of threads
  • All n parallel concurrent thread-pairs execute some performance and communication heavy mutual, but exclusive, protocol exchange, i.e. these run in n fixed pairs and should not mix / interchange any data but with the pairwise fixed-opponent.

My current design uses a single ZeroMQ Context()-instance on both server and client, that is shared between all n-local threads and each respective client/server thread-pair creates a ZMQ_PAIR socket ( I just increment the port# ) on the local, shared, context for communication.

My question

Is there is a smarter or more efficient way of doing this?

i.e.: is there a natural way of using ROUTERS and DEALERS that might increase performance?

I do not have much experience with socket programming and with my approach the number of sockets scales directly with n ( a number of client-server thread-pairs ). This might go to the couple of thousands and I'm unsure if this is a problem or not.

I have control of both the server and client machines and source code and I have no outer restrictions that I need to worry about. All I care about is performance.

I've looked through all the patterns here, but I cannot find anyone that matches the case where the client-server pairs are fixed, i.e. I cannot use load-balancing and such.


Answer:

Happy man!

ZeroMQ is a lovely and powerful tool for highly scaleable, low-overheads, Formal Communication ( behavioural, yes emulating some sort of the peers mutual behaviour "One Asks, the other Replies" et al ) Patterns.

Your pattern is quite simple, behaviourally-unrestricted and ZMQ_PAIR may serve well for this.


Performance

There ought be some more details on quantitative nature of this attribute.

  • a process-to-process latency [us]
  • a memory-footprint of a System-under-Test (SuT) architecture [MB]
  • a peak-amount of data-flow a SuT can handle [MB/s]

Performance Tips ( if quantitatively supported by observed performance data )
  • may increase I/O-performance by increasing Context( nIOthreads ) on instantiation

  • may fine-tune I/O-performance by hard-mapping individual thread# -> Context.IO-thread# which is helpfull for both distributed workload and allows one to keep "separate" localhost IOthread(s) free / ready for higher-priority signalling and others such needs.

  • shall setup application-specific ToS-labeling of prioritised types of traffic, so as to allow advanced processing on the network-layer alongside the route-segments between the client and server

  • if memory-footprint hurts ( ZeroMQ is not Zero-copy on TCP-protocol handling at operating-system kernel level ) one may try to move to a younger sister of ZeroMQ -- authored by Martin SUSTRIK, a co-father of ZeroMQ -- a POSIX compliant nanomsg with similar motivation and attractive performance figures. Worth to know about, at least.


Could ROUTER or DEALER increase an overall performance?

No, could not. Having in mind your stated architecture ( declared to be communication heavy ), other, even more sophisticated Scaleable Formal Communication Patterns behaviours that suit some other needs, do not add any performance benefit, but on the contrary, would cost you additional processing overheads without delivering any justifying improvement.

While your Formal Communication remains as defined, no additional bells and whistles are needed.

One point may be noted on ZMQ_PAIR archetype, some sources quote this to be rather an experimental archetype. If your gut sense feeling does not make you, besides SuT-testing observations, happy to live with this, do not mind a slight re-engineering step, that will keep you with all the freedom of un-pre-scribed Formal Communication Pattern behaviour, while having "non"-experimental pipes under the hood -- just replace the solo ZMQ_PAIR with a pair of ZMQ_PUSH + ZMQ_PULL and use messages with just a one-way ticket. Having the stated full-control of the SuT-design and implementation, this would be all within your competence.


How fast could I go?

There are some benchmark test records published for both the ZeroMQ or nanomsg performance / latency envelopes for un-loaded network transports across the traffic-free route-segments ( sure ).

If your SuT-design strives to go even faster -- say under some 800 ns end-to-end, there are other means to achieve this, but your design will have to follow other distributed computing strategy than a message-based data exchange and your project budget will have to adjust for additional expenditure for necessary ultra-low-latency hardware infrastructure.

It may surprise, but definitely well doable and pretty attractive for systems, where hundreds of nanoseconds are a must-have target within a Colocation Data Centre.

Question:

I have found a lot of question on a similar topic but they didn't help me to solve my problem.

Using :

  • Linux Ubuntu 14.04
  • python 3.4
  • zmq : 4.0.4 // pyZMQ 14.3.1
TL;DR

Receiver queue in ZMQ SUB socket is growing indefinitely even after HWM are set. This happen when subscriber is slower than publisher. What can I do to prevent it ?

Background

I work in the human computer interaction filed. We have a huge code base to control the mouse cursor, this kind of things. I wanted to "break it" in several module, communicating with ZMQ. It must have as little latency as possible, but dropping (losing) messages is not that important.

An other interesting aspect is the possibility to add "spies" between the nodes. Thus the PUB/SUB sockets seems to be the most adequate.

Something like this :

+----------+                +-----------+                 +------------+
|          | PUB            |           |  PUB            |            |
|  Input   | +----+------>  |  Filter   |  +----+------>  |   Output   |
|          |      |     SUB |           |       |     SUB |            |
+----------+      v         +-----------+       v         +------------+
               +-----+                       +-----+                   
               |Spy 1|                       |Spy 2|                   
               +-----+                       +-----+       
Problem

Everything works fine, except when we add the spies. If we add a spy doing "heavy stuff" like real time visualisations with matplotlib we notice an increasing latency in the plots. IE : on the graph above, filter and output are fast, no latency is seen, but on Spy 2, latency can reach 10 min after running 20 min (!!)

It looks like the queue on the receiver grows indefinitely. We investigated the High Water Mark (HWM) functionalities of ZMQ to set it low to drop older messages, but nothing changed.

Minimal code
Architecture :
+------------+                +-------------+
|            |  PUB           |             |
|   sender   | -------------> |  receiver   |
|            |             SUB|             |
+------------+                +-------------+

The receiver is a slow receiver (acting as a spy in the first graph)

Code :

Sender.py

import time
import zmq

ctx = zmq.Context()

sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10

i = 0
while True:
    mess = "{} {}".format(i, time.time())
    sender.send_string(mess)
    print("Send : {}".format(mess))
    i+= 1

receiver.py:

import time
import zmq

ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)

front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)

front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1

while True:
    mess = front_end.recv_string()
    i, t = mess.split(" ")
    mess = "{} {}".format(i, time.time() - float(t))
    print("received : {}".format(mess))
    time.sleep(1)  # slow

I don't think that this is a normal behaviour for ZMQ Pub/Sub. I tried to set the HWM in the receiver, in the subscriber, in both, but nothing changed.

What am I missing ?

Edit :

I don't think I was clear when I explained my problem. I made an implementation moving the mouse cursor. The input was the mouse cursor position send in ZMQ at 200Hz (with a .sleep( 1.0 / 200 ) ), some processing was done and the mouse cursor position was updated (I don't have this sleep in my minimal example).

Everything was smooth, even when I launched the spies. The spies nevertheless had a growing latency (because of the slow processing). The latency doesn't appear in the cursor, at the end of the "pipeline".

I think the problem comes from the slow subscriber queuing the messages.

In my example, if we kill the sender and let the receiver alive, messages will continue to be displayed until all (?) the submitted messages are displayed.

The spy is plotting the position of the cursor to provide some feedback, it is still very inconvenient to have such a lag... I just want to get the last message sent, this is why I tried to lower the HWM.


Answer:

A better real-time design / validation is missing

ZeroMQ is a powerfull messaging layer.

That said, check how many messages it really sends per second in the original while True: killer-loop

Measure it. Design on facts, not on feelings.

Facts matter.
start_CLK = time.time()                                    # .SET _CLK
time.sleep( 0.001)                                         # .NOP avoid DIV/0!
i = 0                                                      # .SET CTR
while True:                                                # .LOOP
    sender.send_string( "{} {}".format( i, time.time() ) ) # .SND ZMQ-PUB 
    print i / ( time.time() - start_CLK )                  # .GUI perf [msg/sec]
    i+= 1                                                  # .INC CTR

ZeroMQ does it's best to populate that avalanche down the scheme.

And it is pretty good at this.

Your [Filter] + [Spy1] + [Output] + [Spy2] pipeline processing, end-to-end, has either

  • be faster, incl. both .send() + .recv_string() overheads than the [Input]-sender

or

  • be the principal blocking sick-slick element, causing the internal PUB/SUB queueing to grow, grow, grow

This chain-of-queues problem can be solved by another architecture design.

Things to re-think:

  1. sub-sample the [Filter].send() cadency ( interleave factor is dependent on stability issues of the real-time process under your control -- be it 1 msec ( btw an O/S timer resolution, so no quantum-physics experiments are possible with COTS O/S timer controls :o) ), 10 msec for bidirectional voice-streaming, 50 msec for TV/GUI streaming, 300 msec for keyboard event-stream et al )

  2. online v/s offline post-processing / visualisation ( you noticed a heavy matplotlib processing, there you typically bear about 800 - 1600 - 3600 msec overheads, even on simple 2D graphing -- measure it before deciding about a change in PUB/SUB-<proc1>-PUB/SUB-<proc2> processing architecture ( you already noticed, that <spy2> cause problems in growing <proc2>-PUB-feeding & sending overheads ).

  3. number of threads vs. number of localhost cores, that execute them -- as seen from the localhost ip, all the processes reside on the same localhost. Plus add +one thread per ZMQ.Context used, plus review Python GIL locking overhead, if all threads were instantiated from the same Python interpreter... Blocking grows. Blocking hurts. A better distributed architecture can improve these performance aspects. However, review [1] and [2] first

n.b. calling a 20 minutes processing pipeline delay ( a real-time system TimeDOMAIN skew ) a latency is a lot euphemistic

Question:

I'm sending large object using the following code in C# (NetMQ):

var client = new DealerSocket("<connection String>");
var serializedData = new string('*', 500000);
var message = new List<byte[]>();
message.Add(Encoding.UTF8.GetBytes("BulkSend"));
message.Add(Encoding.UTF8.GetBytes(serializedData));
client.TrySendMultipartBytes(TimeSpan.FromMilliseconds(3000), message);

This code is taking 90% of CPU usage if it would be used in a high traffic (for example 10MB message per second). After some research, I've tried the two following codes. First of all, I removed the first frame ("Bulk Send"):

var client = new DealerSocket("<connection String>");
var serializedData = new string('*', 500000);
var message = new List<byte[]>();
message.Add(Encoding.UTF8.GetBytes(serializedData));
client.TrySendMultipartBytes(TimeSpan.FromMilliseconds(3000), message);

Surprisingly, the performance was improved. Secondly, I rearrange two frames. I mean moving the large frame to the first. Like the following:

var client = new DealerSocket("<connection String>");
var serializedData = new string('*', 500000);
var message = new List<byte[]>();
// change the order of two following codes
message.Add(Encoding.UTF8.GetBytes(serializedData));
message.Add(Encoding.UTF8.GetBytes("BulkSend"));

client.TrySendMultipartBytes(TimeSpan.FromMilliseconds(3000), message);

Again surprisingly the performance was improved!

What's the problem? How can I improve the performance? What about using zproto on netmq? Is there any proper document around that?


Answer:

High CPU/low performance might be because of GC and memory allocations. I tried to send the same message size with my framework, which uses NetMQ, with server and client working on same machine. In case the client was waiting for the response before sending another message to server I achieved 60K msg/sec. Nevertheless, if the client was sending hundreds of messages in parallel, CPU was as well high and throughput was tens of messages per sec. At some point in time I've even got OutOfMemoryException. Would be good, if you post the complete code.

UPD: Sorry, I measured it wrong. With with either 100 messages sent in parallel or just one the performance is just 120 msg/sec

Question:

I try to build a point to point channel, using a ZMQ_PAIR socket. But the ZMQ_PAIR socket only allows an exclusive connection, which it means that if one peer (A) is connected to another peer (B), other peers can't connect to B, until the A disconnects from the B.

So, the first solution that comes into my mind is to set up as many ZMQ_PAIR sockets as the number of peers in the network and polling the sockets to multiplex the events from other peers. The problem with this is to manage lots of sockets which might incur non-negligible overheads.

But I don't have experimental data about the overheads of managing multiple sockets such as the time to create 50,000 ZMQ_PAIR sockets or the time to poll 50,000 ZMQ_PAIR sockets. Of course, I can do it myself, but I wonder if there is any existing experiment conducted by researchers or network developers who used ZeroMQ.

(I don't want any proxy or REQ/REP pattern. Using proxy incurs some performance degradation and I want pure peer-to-peer network. And REQ/REP pattern is inherently synchronous communication which is not my purpose now.)


Answer:

Not an easy thought experiment to answer in general.

Definitely benchmark before taking decision.

ZeroMQ has its own easy and smart clocking tool, a Stopwatch(). Each instance has .start() and .stop() methods, so your code can benchmark each critical section independently and up to a [us] resolution, so you are ready to go.

One may reasonably start to setup and benchmark just some 1, 10, 20, 50, 100, 200 instances first and thus have a reasonable ground for further scaling the observed behaviour, before solving the 50k+ sized meshes from scratch.

There will be heavy both the [SPACE]-domain ( memory allocation ) and [TIME]-domain non-linearities, as the scale is going to be extended growing towards the 50k+ levels.

The next issue comes from non-homogeneous transport-class mixtures. inproc:// will have the least amount of overheads, while some L3+ transport-protocols will require way larger overheads, than others.

One might enjoy to also test a nanomsg, for its having a NN_BUS scalable formal communication archetype, better matching the needs.