Hot questions for Using ZeroMQ in poller

Question:

As title says, when using zmq::poll with cppzmq, I'd love to avoid hardcoding the translation of ZeroMQ sockets into pollitem_t item(s) for polling.

But it seems that I'm not allowed to convert a void* pollitem_t::socket back to a zmq socket.

I noticed that the official sample code just hardcoded the socket after polling.

//
//  Reading from multiple sockets in C++
//  This version uses zmq_poll()
//
// Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>

#include "zhelpers.hpp"

int main (int argc, char *argv[])
{
    zmq::context_t context(1);

    //  Connect to task ventilator
    zmq::socket_t receiver(context, ZMQ_PULL);
    receiver.connect("tcp://localhost:5557");

    //  Connect to weather server
    zmq::socket_t subscriber(context, ZMQ_SUB);
    subscriber.connect("tcp://localhost:5556");
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);

    //  Initialize poll set
    zmq::pollitem_t items [] = {
        { static_cast<void*>(receiver), 0, ZMQ_POLLIN, 0 },
        { static_cast<void*>(subscriber), 0, ZMQ_POLLIN, 0 }
    };
    //  Process messages from both sockets
    while (1) {
        zmq::message_t message;
        zmq::poll (&items [0], 2, -1);

        if (items [0].revents & ZMQ_POLLIN) {
            receiver.recv(&message);
            //  Process task
        }
        if (items [1].revents & ZMQ_POLLIN) {
            subscriber.recv(&message);
            //  Process weather update
        }
    }
    return 0;
}
Hence the question:

Is it impossible to avoid hardcoding sockets in this case?


Answer:

Puzzled this together from the zmq.hpp header:

zmq::poller_t poller;
poller.add({from_handle, receiver.handle()}, event_flags::pollin});
poller.add({from_handle, subscriber.handle()}, event_flags::pollin});

// This vector must be at least as large as the number of sockets being polled
vector<zmq::poller_event> events(2);
while (true) {
  size_t num_events = poller.wait_all(events, -1);
  // You may need to sleep a bit if num_events == 0

  for (int i = 0; i < num_events; i++) {
    zmq::message message;
    events[i].socket.recv(&message);
    ...
  } 
}

EDIT: Realized that the poller_event structure also contains a socket_ref! That makes things even easier!

With a stable zmq release, you need to #define ZMQ_BUILD_DRAFT_API to use poller_t and its bag.

Question:

I searched already but I still don't understand what's this line meaning.

Why put 1000 in the function?

full code is here

class ClientTask(threading.Thread):
    """ClientTask"""
    def __init__(self, id):
        self.id = id
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        socket = context.socket(zmq.DEALER)
        identity = u'worker-%d' % self.id
        socket.identity = identity.encode('ascii')
        socket.connect('tcp://localhost:5570')
        print('Client %s started' % (identity))
        poll = zmq.Poller()
        poll.register(socket, zmq.POLLIN)
        reqs = 0
        while True:
            reqs = reqs + 1
            print('Req #%d sent..' % (reqs))
            socket.send_string(u'request #%d' % (reqs))
            for i in range(5):
                sockets = dict(poll.poll(1000))//HERE
                if socket in sockets:
                    msg = socket.recv()
                    tprint('Client %s received: %s' % (identity, msg))

        socket.close()
        context.term()

Answer:

Why?

Because if there were not put any value ( or if a value of 0 would be explicitly used there ), the Poller.poll() method would have to wait infinitely for any first event on a configured set of such Poller-instance monitored Socket-instances.

What would that mean?

In such a case, the call of a Poller.poll()-method will block, until any such event appears ( if it appears ), with a non-zero probability, that no such event ever arrives at all.

That case would effectively hang-up your application in an endless ( and totally un-controllable from inside of your code ) wait-state, which is exactly the reason to prevent and avoid entering into such state a Poller.poll( aTimeoutInMILLISECONDs ) prevents this by the very method of setting the timeout.

Question:

My current setup for work on server side is like this -- I have a manager (with poller) which waits for incoming requests for work to do. Once something is received it creates worker (with separate poller, and separate ports/sockets) for the job, and further on worker communicates directly with client.

What I observe that when there is some intense traffic with any of the worker it disables manager somewhat -- ReceiveReady events are fired with significant delays.

NetMQ documentation states "Receiving messages with poller is slower than directly calling Receive method on the socket. When handling thousands of messages a second, or more, poller can be a bottleneck." I am so far below this limit (say 100 messages in a row) but I wonder whether having multiple pollers in single program does not clip performance even further.

I prefer having separate instances because the code is cleaner (separation of concerns), but maybe I am going against the principles of ZeroMQ? The question is -- is using multiple pollers in single program performance wise? Or in reverse -- do multiple pollers starve each other by design?


Answer:

Professional system analysis may even require you to run multiple Poller() instances:

Design system based on facts and requirements, rather than to listen to some popularised opinions.

Implement performance benchmarks and measure details about actual implementation. Comparing facts against thresholds is a.k.a. a Fact-Based-Decision.


If not hunting for the last few hundreds of [ns], a typical scenario may look this way:

your core logic inside an event-responding loop is to handle several classes of ZeroMQ integrated signallin / messaging inputs/outputs, all in a principally non-blocking mode plus your design has to spend specific amount of relative-attention to each such class.

One may accept some higher inter-process latencies for a remote-keyboard ( running a CLI-interface "across" a network, while your event-loop has to meet a strict requirement not to miss any "fresh" update from a QUOTE-stream. So one has to create a light-weight Real-Time-SCHEDULER logic, that will introduce one high-priority Poller() for non-blocking ( zero-wait ), another one with ~ 5 ms test on reading "slow"-channels and another one with a 15 ms test on reading the main data-flow pipe. If you have profiled your event-handling routines not to last more than 5 ms worst case, you still can handle TAT of 25 ms and your event-loop may handle systems with a requirement to have a stable control-loop cycle of 40 Hz.

Not using a set of several "specialised" pollers will not allow one to get this level of scheduling determinism with an easily expressed core-logic to integrate in such principally stable control-loops.

Q.E.D.

I use similar design so as to drive heterogeneous distributed systems for FOREX trading, based on external AI/ML-predictors, where transaction times are kept under ~ 70 ms ( end-to-end TAT, from a QUOTE arrival to an AI/ML advised XTO order-instruction being submitted ) right due to a need to match the real-time constraints of the control-loop scheduling requirements.


Epilogue:

If the documentation says something about a poller performance, in the ranges above 1 kHz signal delivery, but does not mention anything about a duration of a signal/message handling-process, it does a poor service for the public. The first step to take is to measure the process latencies, next, analyse the performance envelopes. All ZeroMQ tools are designed to scale, so has the application infrastructure -- so forget about any SLOC-sized examples, the bottleneck is not the poller instance, but a poor application use of the available ZeroMQ components ( given a known performance envelope was taken into account ) -- one can always increase the overall processing capacity available, with ZeroMQ we are in a distributed-systems realm from a Day 0, aren't we?

So in concisely designed + monitored + adaptively scaled systems no choking will appear.

Question:

I have single socket, which is blocked on sending the frame and I would like to dispose poller ( and everything related to it ). However, I cannot do it -- calling dispose on socket throws an exception, and calling dispose on poller completely blocks.

Please note, I am using SendFrame and I could use TrySendFrame, but the purpose of this example is to actually block the socket and find a way how to clean up everything.

Example:

private Program()
{
    const string address = "tcp://localhost:5000";

    var socket = new DealerSocket();
    socket.Options.SendHighWatermark = 1;
    socket.Options.Identity = Encoding.ASCII.GetBytes(Guid.NewGuid().ToString("N"));
    socket.Connect(address);

    var poller = new NetMQPoller();
    poller.Add(socket);
    socket.SendReady += OnSendReady;
    poller.RunAsync();

    Thread.Sleep(5000);
    Console.WriteLine("Disposing");
    poller.Dispose(); // get stuck on this one
    Console.WriteLine("Disposed");
}

private void OnSendReady(object sender, NetMQSocketEventArgs e)
{
    Console.WriteLine("0");
    e.Socket.SendFrame("hello");
    Console.WriteLine("1");
    e.Socket.SendFrame("hello"); // this will block
    Console.WriteLine("2");
}

Tested with NetMQ 3.3.3.4 (my primary version for now) and 4.0.0.1.


Answer:

1) Always zeroize ZMQ_LINGER. Always. Right upon a socket instantiation, so as to prevent otherwise inadvertent blocking on graceful termination { .close() | .term() } operations ( be it a controlled or un-controlled activation of termination ops ( as you already have faced it ) ).

2) Avoid blocking designs. Principally. Each blocking state creates a state, in which all your code is out-of-control. Got it?

Question:

This is the code I'm using to receive zmq messages:

poller = zmq.Poller()
for socket, event in poller.poll(0):
    if socket is my_sock:
        my_sock = messaging.recv_one(socket).my_data
print(my_sock.status)

It works in other files in the project I'm working on, however, in this specific file it doesn't receive anything. I tried placing a print statement in the for loop and it failed to print anything at all. Any tips on why this is? Thanks


Answer:

When I got my socket from the poller, I failed to call the data correctly.

What I used:

my_sock.status

What I needed to call:

my_sock.MyStruct.status

I simply forgot to call the name of the structure in which I instantiated my variables in log.capnp:

struct MyStruct{
  status @0 :Bool;
  speed @1 :Float32;
  angle @2 :Float32;
  time @3 :Float32;
}

Question:

When trying to compile my c++ project the compiler exits with an error in ZMQ's poller.ipp. Am I doing something wrong, do I need additional compiler flags or something?

I have installed the c++ binding (czmqp++) using brew install czmqpp

System: Mac OSX 10.11.5

This is the output when I am trying to compile with the following command:

> gcc -Wall -o HardwareHub HardwareHub.cpp

In file included from HardwareHub.cpp:4:
In file included from ./ZMQCommunicator.h:3:
In file included from /usr/local/include/czmq++/czmqpp.hpp:28:
In file included from /usr/local/include/czmq++/poller.hpp:48:
/usr/local/include/czmq++/impl/poller.ipp:29:19: error: expected expression
    auto unmask = [](socket& s)
                  ^
1 error generated.

Thank you in advance


Answer:

You need to use the c++ compiler, not the c compiler:

> gcc -Wall -o HardwareHub HardwareHub.cpp

should be

> g++ -Wall -o HardwareHub HardwareHub.cpp

In case your currently installed GCC version doesn't support the current c++ standard use

> g++ -std=c++11 -Wall -o HardwareHub HardwareHub.cpp

Question:

I am using simple requests.post() module to connect to a server and receive data

ack = requests.post('<ip>/get_data, data=data, timeout=10.0, verify=False)

Below is a method get_data() in server that's using zmq poller to receive data until final response is obtained.

def get_data():
    req = json.loads(self.params.get('data'))
    wire = wiring.Wire("indexing_data_pool", zmq_context=g.zmq_context)
    try:
        close_immediately = True
        poll_agent = zmq.Poller()
        poll_agent.register(wire.socket, zmq.POLLIN)
        wire.send(req)
        iteration = 1
        while True:
            socks = dict(poll_agent.poll())
            if wire.socket in socks and socks[wire.socket] == zmq.POLLIN:
                res = gevent.with_timeout(10, wire.recv, timeout_value=None)
                if res.get('final'):
                    log.warn('Last Iteration: %s, Length of Rows: %s' %(iteration, len(res['rows'])))
                    break
                else:
                    log.warn('Iteration: %s, Length of Rows: %s' %(iteration, len(res['rows'])))
                    next_req = {'search_id': req['search_id'], 'seen_version':res.get('response_version')}
                    wire.send(next_req)
                iteration +=1
    finally:
        wire.close(immediate=close_immediately)
        poll_agent.unregister(wire.socket)

In server side, the response obtained is logged as below:

2017-08-28_07:17:55.43370 WARNING: Iteration: 1, Length of Rows: 100
2017-08-28_07:17:55.44269 WARNING: Iteration: 2, Length of Rows: 100
2017-08-28_07:17:55.44894 WARNING: Iteration: 3, Length of Rows: 100
2017-08-28_07:17:55.45742 WARNING: Iteration: 4, Length of Rows: 100
2017-08-28_07:17:55.46327 WARNING: Iteration: 5, Length of Rows: 100
2017-08-28_07:17:55.46687 WARNING: Iteration: 6, Length of Rows: 100
2017-08-28_07:17:55.47074 WARNING: Iteration: 7, Length of Rows: 100
2017-08-28_07:17:55.47658 WARNING: Iteration: 8, Length of Rows: 100
2017-08-28_07:17:55.48385 WARNING: Last Iteration: 9, Length of Rows: 75

So, I assume that the zmq poller I have implemented in server side is working perfectly fine. But, I am curious to know how to send back these 9 iterations of data back to the requesting client?

P.S. I want to receive the data continuously in client side. You may suggest appending each batch of response somewhere and sending the final response back to the client. This won't be feasible when the response is too big (The requesting client would get timeout)


Answer:

Ok, I handled this using Response object from Flask module. Invoking a separate method that'd yield the data continuously and making the final Response object and returning it solved my problem.

Question:

I'm opening a ZMQ socket and registering it with a poller as follows:

poll = zmq.Poller()
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, "identity")
socket.connect(url)
poll.register(socket, zmq.POLLIN)

Now I'd like to close and re-connect the socket to the same url. What is the best way to do this given that it is registered with the poller?


Answer:

poll.unregister(socket)
socket.close()