Hot questions for Using ZeroMQ in tornado

Question:

Currently in pzmq there are multiple ways to implement an asynchronous IO Loop all mentioned in the documentation. http://pyzmq.readthedocs.io/en/latest/api/index.html

From a pure performance point of view, it is not clear in the documentation if the choices are equivalent. These are:

  • Tornado @gen.coroutine
  • Native @asyncio.coroutine
  • Tornado async

So, which one has the best throughput in messages per second in a typical PUSH-PULL scenario as shown in the documentation examples? Why do we see a difference between them?


Answer:

Results

Contrary to my expectations, it seems both asyncio implementations in pyzmq are slower than the "legacy" tornado one.

Tornado @gen.coroutine:

Avg. Speed: 2160.26 msg/s

Native @asyncio.coroutine:

Avg. Speed: 1697.66 msg/s

Tornado async:

Avg. Speed: 1695.29 msg/s

Also, the two asyncio implementations show bursts of up to 3536.27 msg/s every now and then, whereas the Tornado @gen.coroutine is very stable in throughput.

Test Code

For comparing I have used modified versions of:

Tornado @gen.coroutine: https://github.com/zeromq/pyzmq/blob/master/examples/eventloop/coroutines.py

Native @asyncio.coroutine: https://github.com/zeromq/pyzmq/blob/master/examples/asyncio/coroutines.py

Tornado async: (Python 3.5+ only; included additional print coroutine) https://github.com/zeromq/pyzmq/blob/master/examples/asyncio/tornado_asyncio.py

The modification consists of displaying the number of average messages per second every 5 seconds instead of the dots. In the PULL coroutine, I increment n = n + 1 and then in the printing coroutine I calculate v = n / (time.time() - start) and display it. Also there, I reset n = 0 and start = time.time() every 10,000 messages to prevent any offset effects in the measurement.

Environment
$ uname -a
Linux localhost 4.6.3-300.fc24.x86_64 #1 SMP Fri Jun 24 20:52:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux

$ python3 --version
Python 3.5.1

>>> zmq.__version__
'15.3.0'

>>> tornado.version
'4.3'

Question:

I want to write a single threaded program that hosts a webserver using Tornado and also receive messages on a ZMQ socket (using PyZMQ Tornado event loop: http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html), but I'm not sure how to structure it. Should I be using

from zmq.eventloop import ioloop

or

from tornado.ioloop import IOLoop

or both?


Answer:

Before all Tornado imports you need import zmq.eventloop.ioloop and call zmq.eventloop.ioloop.install function. Then you may import Tornado ioloop and use it.

See: http://zeromq.github.io/pyzmq/eventloop.html

Question:

I've been trying to set up a server / client using zmq eventloop for REQ / REP messaging. Since python 3 doesn't support the eventloop provided by zmq, I'm trying to run it with tornado's eventloop.

I'm facing issues running zmqStream with tornado's event loop using python 3.

I created the server / client code using zmq's zmqStream and tornado's eventloop. The client is sending the correct messages, but the server doesn't seem to be responding to the message requests.

The server side code:

from tornado import ioloop
import zmq

def echo(stream, msg):
   stream.send_pyobj(msg)

ctx = zmq.Context()
socket = ctx.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:5678')
stream = ZMQStream(socket)
stream.on_recv(echo)
ioloop.IOLoop.current().start()

The client side code:

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://127.0.0.1:5678")

for request in range (1,10):
   print("Sending request ", request,"...")
   socket.send_string("Hello")
   # Get the reply.
   message = socket.recv_pyobj()
   print("Received reply ", request, "[", message, "]")

I was expecting the server to return back the request messages being sent by the client. But it is just not responding to the requests being sent.


Answer:

Q : server doesn't seem to be responding


Step 0 :

One server-side SLOC, stream = ZMQStream( socket ) calls a function, that is not MCVE-documented and must and does fail to execute to yield any result: "ZMQStream" in dir() confirms this with False

Remedy: repair the MCVE and also print( zmq.zmq_version ) + "ZMQStream" in dir() confirmation


Step 1:

Always prevent infinite deadlocks, unless due reason exists not to do so, with setting prior to doing respective .bind() or .connect() <aSocket>.setsockopt( zmq.LINGER, 0 ). Hung forever applications and un-released (yes, you read it correctly, infinitely blocked) resources are not welcome in distributed computing systems.


Step 2:

Avoid a blind distributed-mutual-deadlock the REQ/REP is always prone to run into. It will happen, one just never knows when. May read heaps of details about this on StackOverflow.

And remedy? May (and shall,where possible) avoid using the blocking-forms of .recv()-s (fair .poll()-s are way smarter-design-wise,resources-wise) may use additional sender-side signalisation before "throwing" either side into infinitely-blocking .recv()-s (yet a network delivery failure or other reason for a silent message drop may cause soft-signaling to flag sending, which did not result in receiving and mutual-deadlocking, where hard-wired behaviour moves both of the REQ/REP side into waiting one for the other, to send a message (which the counterparty will never send, as it is also waiting for .recv()-ing the still not received one from the (still listening) opposite side ))


Last, but not least:

The ZeroMQ Zen-of-Zero has also a Zero-Warranty - as messages are either fully delivered (error-free) or not delivered at all. The REQ/REP mutual deadlocks are best resolvable if one never falls into 'em (ref. LINGER and poll() above)

Let me wish you enjoy all the beauties of the ZeroMQ for your future smart-messaging distributed designs.

Question:

I am trying to apply the PUSH/PULL pattern as depicts in the following figure:

                            | PULL  ---> Send via HTTP
                            | PULL  ---> Send via HTTP
---- PUSH ----- DEVICE ---- | PULL  ---> Send via HTTP
                            | PULL  ---> Send via HTTP
                            | PULL  ---> Send via HTTP

The PUSH socket connects to the ZeroMQ device and emits the messages which are then propagated to all connected PULL sockets. What I want to achieve is a kind of parallel processing over multiple nodes in a pipeline. When processing has been done by the PULL socket, it should forward the message via HTTP to the remote endpoint.

Here is the code:

from multiprocessing import Process
import random
import time
import zmq
from zmq.devices import ProcessDevice

from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream

ioloop.install()


bind_in_port = 5559
bind_out_port = 5560

dev = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
dev.bind_in("tcp://127.0.0.1:%d" % bind_in_port)
dev.bind_out("tcp://127.0.0.1:%d" % bind_out_port)
dev.setsockopt_in(zmq.IDENTITY, b'PULL')
dev.setsockopt_out(zmq.IDENTITY, b'PUSH')
dev.start()
time.sleep(2)


def push():
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.connect("tcp://127.0.0.1:%s" % bind_in_port)
    server_id = random.randrange(1,10005)
    for i in range(5):
        print("Message %d sent" % i)
        socket.send_string("Push from %s" % server_id)


def pull():
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.connect("tcp://127.0.0.1:%s" % bind_out_port)
    loop = ioloop.IOLoop.instance()

    pull_stream = ZMQStream(socket, loop)

    def on_recv(message):
        print(message)
    pull_stream.on_recv(on_recv)

    loop.start()

Process(target=push).start()

time.sleep(2)

for i in range(2):
    Process(target=pull).start()

Although the messages are correctly sent to the ZeroMQ device, I cannot see any message received - the on_recv callback is never being called.

Any help is appreciated.

Thanks


Answer:

there's missing code above in the device init to provide a full answer. what are dev and *port ? 1 thing could be to add the sleep(1) after .connect for ports to stabilize note: no need to set identity on push/pull