Hot questions for Using ZeroMQ in tornado
Which pyzmq implementation has the fastest throughput between @gen.coroutine, @asyncio.coroutine and async?
Currently in pzmq there are multiple ways to implement an asynchronous IO Loop all mentioned in the documentation. http://pyzmq.readthedocs.io/en/latest/api/index.html
From a pure performance point of view, it is not clear in the documentation if the choices are equivalent. These are:
- Tornado @gen.coroutine
- Native @asyncio.coroutine
- Tornado async
So, which one has the best throughput in messages per second in a typical PUSH-PULL scenario as shown in the documentation examples? Why do we see a difference between them?
Contrary to my expectations, it seems both asyncio implementations in pyzmq are slower than the "legacy" tornado one.
Avg. Speed: 2160.26 msg/s
Avg. Speed: 1697.66 msg/s
Avg. Speed: 1695.29 msg/s
Also, the two asyncio implementations show bursts of up to 3536.27 msg/s every now and then, whereas the Tornado @gen.coroutine is very stable in throughput.
For comparing I have used modified versions of:
Tornado @gen.coroutine: https://github.com/zeromq/pyzmq/blob/master/examples/eventloop/coroutines.py
Native @asyncio.coroutine: https://github.com/zeromq/pyzmq/blob/master/examples/asyncio/coroutines.py
Tornado async: (Python 3.5+ only; included additional print coroutine) https://github.com/zeromq/pyzmq/blob/master/examples/asyncio/tornado_asyncio.py
The modification consists of displaying the number of average messages per second every 5 seconds instead of the dots. In the PULL coroutine, I increment
n = n + 1 and then in the printing coroutine I calculate
v = n / (time.time() - start) and display it. Also there, I reset
n = 0 and
start = time.time() every 10,000 messages to prevent any offset effects in the measurement.
$ uname -a Linux localhost 4.6.3-300.fc24.x86_64 #1 SMP Fri Jun 24 20:52:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux $ python3 --version Python 3.5.1 >>> zmq.__version__ '15.3.0' >>> tornado.version '4.3'
I want to write a single threaded program that hosts a webserver using Tornado and also receive messages on a ZMQ socket (using PyZMQ Tornado event loop: http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/multisocket/tornadoeventloop.html), but I'm not sure how to structure it. Should I be using
from zmq.eventloop import ioloop
from tornado.ioloop import IOLoop
Before all Tornado imports you need import zmq.eventloop.ioloop and call zmq.eventloop.ioloop.install function. Then you may import Tornado ioloop and use it.
I've been trying to set up a server / client using zmq eventloop for
REQ / REP messaging. Since python 3 doesn't support the eventloop provided by zmq, I'm trying to run it with tornado's eventloop.
I'm facing issues running zmqStream with tornado's event loop using python 3.
I created the server / client code using zmq's zmqStream and tornado's eventloop. The client is sending the correct messages, but the server doesn't seem to be responding to the message requests.
The server side code:
from tornado import ioloop import zmq def echo(stream, msg): stream.send_pyobj(msg) ctx = zmq.Context() socket = ctx.socket(zmq.REP) socket.bind('tcp://127.0.0.1:5678') stream = ZMQStream(socket) stream.on_recv(echo) ioloop.IOLoop.current().start()
The client side code:
import zmq context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://127.0.0.1:5678") for request in range (1,10): print("Sending request ", request,"...") socket.send_string("Hello") # Get the reply. message = socket.recv_pyobj() print("Received reply ", request, "[", message, "]")
I was expecting the server to return back the request messages being sent by the client. But it is just not responding to the requests being sent.
Q : server doesn't seem to be responding
Step 0 :
One server-side SLOC,
stream = ZMQStream( socket ) calls a function, that is not MCVE-documented and must and does fail to execute to yield any result:
"ZMQStream" in dir() confirms this with
repair the MCVE and also
print( zmq.zmq_version ) +
"ZMQStream" in dir() confirmation
Always prevent infinite deadlocks, unless due reason exists not to do so, with setting prior to doing respective
<aSocket>.setsockopt( zmq.LINGER, 0 ). Hung forever applications and un-released (yes, you read it correctly, infinitely blocked) resources are not welcome in distributed computing systems.
Avoid a blind distributed-mutual-deadlock the
REQ/REP is always prone to run into. It will happen, one just never knows when. May read heaps of details about this on StackOverflow.
And remedy? May (and shall,where possible) avoid using the blocking-forms of
.poll()-s are way smarter-design-wise,resources-wise) may use additional sender-side signalisation before "throwing" either side into infinitely-blocking
.recv()-s (yet a network delivery failure or other reason for a silent message drop may cause soft-signaling to flag sending, which did not result in receiving and mutual-deadlocking, where hard-wired behaviour moves both of the REQ/REP side into waiting one for the other, to send a message (which the counterparty will never send, as it is also waiting for
.recv()-ing the still not received one from the (still listening) opposite side ))
Last, but not least:
The ZeroMQ Zen-of-Zero has also a Zero-Warranty - as messages are either fully delivered (error-free) or not delivered at all. The
REQ/REP mutual deadlocks are best resolvable if one never falls into 'em (ref.
Let me wish you enjoy all the beauties of the ZeroMQ for your future smart-messaging distributed designs.
I am trying to apply the PUSH/PULL pattern as depicts in the following figure:
| PULL ---> Send via HTTP | PULL ---> Send via HTTP ---- PUSH ----- DEVICE ---- | PULL ---> Send via HTTP | PULL ---> Send via HTTP | PULL ---> Send via HTTP
PUSH socket connects to the ZeroMQ device and emits the messages which are then propagated to all connected
PULL sockets. What I want to achieve is a kind of parallel processing over multiple nodes in a pipeline.
When processing has been done by the
PULL socket, it should forward the message via HTTP to the remote endpoint.
Here is the code:
from multiprocessing import Process import random import time import zmq from zmq.devices import ProcessDevice from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream ioloop.install() bind_in_port = 5559 bind_out_port = 5560 dev = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH) dev.bind_in("tcp://127.0.0.1:%d" % bind_in_port) dev.bind_out("tcp://127.0.0.1:%d" % bind_out_port) dev.setsockopt_in(zmq.IDENTITY, b'PULL') dev.setsockopt_out(zmq.IDENTITY, b'PUSH') dev.start() time.sleep(2) def push(): context = zmq.Context() socket = context.socket(zmq.PUSH) socket.connect("tcp://127.0.0.1:%s" % bind_in_port) server_id = random.randrange(1,10005) for i in range(5): print("Message %d sent" % i) socket.send_string("Push from %s" % server_id) def pull(): context = zmq.Context() socket = context.socket(zmq.PULL) socket.connect("tcp://127.0.0.1:%s" % bind_out_port) loop = ioloop.IOLoop.instance() pull_stream = ZMQStream(socket, loop) def on_recv(message): print(message) pull_stream.on_recv(on_recv) loop.start() Process(target=push).start() time.sleep(2) for i in range(2): Process(target=pull).start()
Although the messages are correctly sent to the ZeroMQ device, I cannot see any message received - the
on_recv callback is never being called.
Any help is appreciated.
there's missing code above in the device init to provide a full answer. what are dev and *port ? 1 thing could be to add the sleep(1) after .connect for ports to stabilize note: no need to set identity on push/pull