Hot questions for Using ZeroMQ in polling

Question:

What I'm trying to accomplish is to implement reading a message from one of two sockets, wherever it arrives first. As far as I understand polling (zmq_poll) is the right thing to do (as demonstrated in mspoller in guide). Here I'll provide small pseudo-code snippet:

TimeSpan timeout = TimeSpan.FromMilliseconds(50);

using (var receiver1 = new ZSocket(ZContext.Current, ZSocketType.DEALER))
using (var receiver2 = new ZSocket(ZContext.Current, ZSocketType.PAIR))
{
    receiver1.Bind("tcp://someaddress");
    // Note that PAIR socket is inproc:
    receiver2.Connect("inproc://otheraddress");

    var poll = ZPollItem.CreateReceiver();

    ZError error;
    ZMessage msg;

    while (true)
    {
        if (receiver1.PollIn(poll, out msg, out error, timeout))
        {
            // ...
        }

        if (receiver2.PollIn(poll, out msg, out error, timeout))
        {
            // ...
        }
    }
}

As you can see it is actually the same exact implementation as in mspoller in guide.

In my case receiver2 (PAIR socket) should receive a large number of messages. In fact I've created a test in which number of messages sent to it is always greater than the number of messages it is capable to receive (at least in demonstrated implementation).

I've run the test for 2 seconds, and I was very surprised with results:

  • Number of messages sent to receiver2: 180 (by "sent" I mean that they are handed out to another PAIR socket not shown in the previous snippet);
  • Number of messages received by receiver2: 21 ??? Only 21 messages in 2 seconds??? 10 messages per second???

Then I've tried to play with different timeout values and I've found out that it significantly influences the number of messages received. Duration (2 seconds) and number of messages sent (180) remain the same. The results are:

  • timeout value of 200 milliseconds - number of messages received drops to 10 (5 per second);
  • timeout value of 10 milliseconds - number of messages received rises to 120 (60 per second).

The results are telling me that polling simply does not work. If polling were working properly, as far as I understand the mechanism, timeout should not have any influence in this scenario. No matter if we set timeout to 1 hour or 5 milliseconds - since there are always messages to receive there's nothing to wait for, so the loop should work with the same speed.

My another big concern is the fact that even with very small timeout value receiver2 is not capable to receive all 180 messages. I'm struggling here to accomplish receiving rate of 100 messages per second, although I've selected ZeroMQ which should be very fast (benchmarks are mentioning numbers as 6 million messages per second).

So my question is obvious: am I doing something wrong here? Is there a better way to implement polling?

By browsing clrzmq4 code I've noticed that there's also possibility to call pollIn method on enumeration of sockets ZPollItems.cs, line 151, but I haven't found any example anywhere!

Can this be the right approach? Any documentation anywhere?

Thanks


Answer:

I've found the problem / solution for this. Instead using PollIn method on each socket separately we should use PollIn method on array of sockets. Obviously the example from the guide is HUGELY MISLEADING. Here's the correct approach:

TimeSpan timeout = TimeSpan.FromMilliseconds(50);

using (var receiver1 = new ZSocket(ZContext.Current, ZSocketType.DEALER))
using (var receiver2 = new ZSocket(ZContext.Current, ZSocketType.PAIR))
{
    receiver1.Bind("tcp://someaddress");
    receiver2.Connect("inproc://otheraddress");

    // We should "remember" the order of sockets within the array
    // because order of messages in the received array will correspond to it.
    ZSocket[] sockets = { receiver1, receiver2 };

    // Note that we should use two ZPollItem instances:
    ZPollItem[] pollItems = { ZPollItem.CreateReceiver(), ZPollItem.CreateReceiver() };

    ZError error;
    ZMessage[] msg;

    while (true)
    {
        if (sockets.PollIn(pollItems, out msg, out error, timeout))
        {
            if (msg[0] != null)
            {
                // The first message gotten from receiver1
            }

            if (msg[1] != null)
            {
                // The second message gotten from receiver2
            }
        }
    }
}

Now receiver2 reaches 15,000 received messages per second, no matter timeout value, and no matter number of messages received by receiver1.

UPDATE: Folks from clrzmq4 have acknowledged this issue, so probably the example will be corrected soon.

Question:

I am using a call to zmq_poll on Linux, in my C++ app, to poll from reading from console input. Right now I am not using any ZeroMQ sockets but I will in the next future.

As per my understanding zmq_poll ONLY on Linux can use linux file descriptor including STDIN_FILENO.

I was expecting the code below to return an EINTR when I send a CTRL-C to my application.

When I debug it, I can see that it keeps on waiting until end of timeout and it returns res_num=0.

   zmq_pollitem_t* mpPollItems = new zmq_pollitem_t[1];
   std::memset(mpPollItems,0,sizeof(zmq_pollitem_t));

    mpPollItems[0].socket = NULL;
    mpPollItems[0].fd = STDIN_FILENO;
    mpPollItems[0].events = ZMQ_POLLIN;

    // Poll for events some milliseconds
    int res_num = zmq_poll (mpPollItems, 1, 10000);

zmq_poll is using a linux 'poll' behind the scenes.

Am I doing something wrong with this code or did I misunderstand something in the way zmq_poll works?

Could it be something to do with signal masking?

Note: I am calling zmq_poll in a separate thread I spawn from the main. I did the test using directly 'poll' and I get the same result. I can only see the signal delivered only if 'poll' is called in the main thread; if it is called on a separate thread there is no reaction.


Answer:

The more recent ZeroMQ API extensions have brought a few "dirty" tricks, going outside of the original Zen-of-ZERO.

Yes, the extended use of O/S-native TCP-peers under a ZMQ_STREAM Scalable Formal Communication Pattern archetype, became possible, but these steps start to have problems of inconsistent expectation-v/s-reality delivered.

The native O/S-file-descriptor may get imperatively injected into the .fd item of the zmq_pollitems_t struct(s), yet the EINTR signal get raised for ZeroMQ-Socket()-instances upon propagated O/S-signals, which does not seem to work the same way for "injected"-only polling.

If the same difference persists also the "complete"-setup ( using a fully configured and setup ZMQ_STREAM-Socket()-instance, not a just ad-hoc, .fd-"injected"-only trick ) polling, best raise a change-request for the package maintainers, so as to refactor the handling back to the Zen-of-Zero uniform mode for both types of polled-devices.

Question:

With a REQ-REP pattern, I am trying to request multiple clients with a timeout using poll, so that if the server detects that it is not able to receive a message from the first client, it will timeout and move on to the next client.

But it seems like after the initial timeout, it is unable to send the next message to the second client.

I am getting this error zmq.error.ZMQError: Operation cannot be accomplished in current state on this line socket.send_string("Sensor Data") in server.

Full output:

Connecting to machine...
Successfully connected to machine 127.0.0.1:9999
Successfully connected to machine 127.0.0.1:9998
Sending request  0 ...
Machine did not respond
Sending request  1 ...
Traceback (most recent call last):
  File "C:\Users\tobiw\Documents\Python\Raspberry Pi\zmq\REQrep\testSIMPLEpoll.py", line 16, in <module>
    socket.send_string("Sensor Data")
  File "C:\Users\tobiw\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pyzmq-17.0.0b3-py3.6-win32.egg\zmq\sugar\socket.py", line 541, in send_string
    return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
  File "C:\Users\tobiw\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pyzmq-17.0.0b3-py3.6-win32.egg\zmq\sugar\socket.py", line 384, in send
    return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
  File "zmq/backend/cython/socket.pyx", line 727, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 771, in zmq.backend.cython.socket.Socket.send
  File "zmq/backend/cython/socket.pyx", line 249, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/socket.pyx", line 244, in zmq.backend.cython.socket._send_copy
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
    raise ZMQError(errno)
zmq.error.ZMQError: Operation cannot be accomplished in current state
[Finished in 5.3s with exit code 1]

Server:

import zmq
import json

ports = ["127.0.0.1:9999", "127.0.0.1:9998"]

context = zmq.Context()
print("Connecting to machine...")
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 0)
for port in ports:
    socket.connect("tcp://%s" % port)
    print("Successfully connected to machine %s" % port)

for request in range(len(ports)):
    print("Sending request ", request, "...")
    socket.send_string("Sensor Data")  # <-- error occurs here

    # use poll for timeouts:
    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)

    socks = dict(poller.poll(5 * 1000))

    if socket in socks:
        try:
            msg_json = socket.recv()
            sens = json.loads(msg_json)
            response = "Sensor: %s :: Data: %s :: Client: %s" % (sens['sensor'], sens['data'], sens['client'])
            print("Received reply ", request, "[", response, "]")
        except IOError:
            print("Could not connect to machine")
    else:
        print("Machine did not respond")

Client:

import zmq
import time
import json

port = "9998" # multiple similar clients but just with different ports

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)

while True:
    #  Wait for next request from server
    message = str(socket.recv(), "utf-8")
    print("Received request: ", message)
    time.sleep(1)
    msgDict = {
        'sensor': "6",
        'data': "123456789",
        'client': "9876",
    }
    msg_json = json.dumps(msgDict)
    socket.send_string(msg_json)

If the server was able to receive a message from the first client, the second send to the second client will work fine, but if the server was not able to receive a message from the first client, then the error is reproduced.


Answer:

Foremost, zmq.error.ZMQError: Operation cannot be accomplished in current state in a REQ-REP pattern is an indication that the order of send -> recv -> send -> recv is not in order. For my case, because of the poll on receive in the for-loop there was no final recv on the REQ server side as it timed out. When the method looped back, it went to send again which resulted in send -> recv -> send -> timeout -> send. A double send scenario which is illegal.

What I did to rectify it: I switched from a REQ-REP pattern to a DEALER-REP pattern. This gives me an asynchronous server that can talk to multiple REP clients.

With client staying the same, this is the new server for those who are interested:

Server:

import zmq
import json

ports = ["127.0.0.1:9999", "127.0.0.1:9998"]

context = zmq.Context()
print("Connecting to machine...")
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.LINGER, 0)
for port in ports:
    socket.connect("tcp://%s" % port)
    print("Successfully connected to machine %s" % port)

for request in range(len(ports)):
    print("Sending request ", request, "...")
    socket.send_string("", zmq.SNDMORE)  # delimiter
    socket.send_string("Sensor Data")  # actual message

    # use poll for timeouts:
    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)

    socks = dict(poller.poll(5 * 1000))

    if socket in socks:
        try:
            socket.recv()  # discard delimiter
            msg_json = socket.recv()  # actual message
            sens = json.loads(msg_json)
            response = "Sensor: %s :: Data: %s :: Client: %s" % (sens['sensor'], sens['data'], sens['client'])
            print("Received reply ", request, "[", response, "]")
        except IOError:
            print("Could not connect to machine")
    else:
        print("Machine did not respond")

Question:

I have run into a strange behaviour with ZeroMQ that I have been trying to debug the whole day now.

Here is a minimal example script which reproduces the problem. It can be run with Python3.

One server with a REP socket is started and five clients with REP sockets connect to it basically simultaneously. The result is that the server starts to block for some reason after the first few messages. It seems like the poller.poll(1000) is what blocks indefinitely.

This behavior also seems to be timing-dependant. Insert a sleep(0.1) in the loop that starts the clients and it works as expected.

I would have expected the REP socket to queue all incoming messages and release them one after the other via sock.recv_multipart().

What is happening here?

import logging
from threading import Thread
from time import sleep
import zmq

logging.basicConfig(level=logging.INFO)
PORT = "3446"
stop_flag = False

def server():

    logging.info("started server")
    context = zmq.Context()
    sock = context.socket(zmq.REP)
    sock.bind("tcp://*:" + PORT)
    logging.info("bound server")

    poller = zmq.Poller()
    poller.register(sock, zmq.POLLIN)

    while not stop_flag:

        socks = dict(poller.poll(1000))
        if socks.get(sock) == zmq.POLLIN:

            request = sock.recv_multipart()
            logging.info("received %s", request)
            # sleep(0.5)

            sock.send_multipart(["reply".encode()] + request)

    sock.close()

def client(name:str):
    context = zmq.Context()
    sock = context.socket(zmq.REQ)
    sock.connect("tcp://localhost:" + PORT)
    sock.send_multipart([name.encode()])
    logging.info(sock.recv_multipart())
    sock.close()

logging.info("starting server")
server_thread = Thread(target=server)
server_thread.start()
sleep(1)

nr_of_clients = 5
for i in range(nr_of_clients):
    Thread(target=client, args=[str(i)]).start()

stop_flag = True

Answer:

For me the problem seems to be that you are "shutting down" the server before all clients have received their reply. So I guess its not the server who's blocking but clients are.

You can solve this by either waiting some time before you set the stop_flag:

sleep(5)
stop_flag = True

or, better, you explicitely join the client threads like:

nr_of_clients = 5
threads = []
for i in range(nr_of_clients):
    thread = Thread(target=client, args=[str(i)])
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

stop_flag = True

Question:

I have one ZMQ_PUB socket in one process, connected to a ZMQ_SUB socket in another process, as it's another server. The only thing I need to do is to publish as many messages I can. I'm sure it's connected first.

I'm running a load test, firing thousands of identical messages from the PUB. At the other side, I'm polling the ZMQ_SUB socket, and receiving the messages only when needed.

The weird thing is that every 1000 messages received, zmq_poll returns ETERM for about 3 times. I can't figure it out.

-- Solution in the comments bellow.


Answer:

Turns out I was misinterpreting the return code of the poll, recv and send functions. It returns -1 everytime it gets a signal, and it does get a lot of EINTR, that's fine, perfectly normal. I can just ignore them and repeat what I was trying to do. In my polling, I can just continue. If it came from a send or recv, I can keep trying to send or recv as long errno == EINTR. The problem is solved