Hot questions for Using ZeroMQ in python asyncio

Top 10 C/C++ Open Source / ZeroMQ / python asyncio

Question:

I'm trying to code a simple program based on Asyncio and a Publish/Subscribe design pattern implemented with ZeroMQ. The publisher has 2 coroutines; one that listens for incoming subscriptions, and another one that publishes the value (obtained via an HTTP request) to the subscriber. The subscriber subscribes to a specific parameter (the name of a city in this case), and waits for the value (the temperature in this city).

Here is my code:

publisher.py

#!/usr/bin/env python

import json
import aiohttp
import aiozmq
import asyncio
import zmq


class Publisher:
    BIND_ADDRESS = 'tcp://*:10000'

    def __init__(self):
        self.stream = None
        self.parameter = ""

    @asyncio.coroutine
    def main(self):
        self.stream = yield from aiozmq.create_zmq_stream(zmq.XPUB, bind=Publisher.BIND_ADDRESS)
        tasks = [
            asyncio.async(self.subscriptions()),
            asyncio.async(self.publish())]
        print("before wait")
        yield from asyncio.wait(tasks)
        print("after wait")

    @asyncio.coroutine
    def subscriptions(self):
        print("Entered subscriptions coroutine")
        while True:
            print("New iteration of subscriptions loop")
            received = yield from self.stream.read()
            first_byte = received[0][0]
            self.parameter = received[0][-len(received[0])+1:].decode("utf-8")
            # Subscribe request
            if first_byte == 1:
                print("subscription request received for parameter "+self.parameter)
            # Unsubscribe request
            elif first_byte == 0:
                print("Unsubscription request received for parameter "+self.parameter)


    @asyncio.coroutine
    def publish(self):
        print("Entered publish coroutine")
        while True:
            if self.parameter:
                print("New iteration of publish loop")

                # Make HTTP request
                url = "http://api.openweathermap.org/data/2.5/weather?q="+self.parameter
                response = yield from aiohttp.request('GET', url)
                assert response.status == 200
                content = yield from response.read()

                # Decode JSON string
                decoded_json = json.loads(content.decode())

                # Get parameter value
                value = decoded_json["main"]["temp"]

                # Publish fetched values to subscribers
                message = bytearray(self.parameter+":"+str(value),"utf-8")
                print(message)
                pack = [message]

                print("before write")
                yield from self.stream.write(pack)
                print("after write")

            yield from asyncio.sleep(10)

test = Publisher()
loop = asyncio.get_event_loop()
loop.run_until_complete(test.main())

subscriber.py

#!/usr/bin/env python

import zmq

class Subscriber:
    XSUB_CONNECT = 'tcp://localhost:10000'

    def __init__(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.XSUB)
        self.socket.connect(Subscriber.XSUB_CONNECT)

    def loop(self):
        print(self.socket.recv())
        self.socket.close()

    def subscribe(self, parameter):
        self.socket.send_string('\x01'+parameter)
        print("Subscribed to parameter "+parameter)

    def unsubscribe(self, parameter):
        self.socket.send_string('\x00'+parameter)
        print("Unsubscribed to parameter "+parameter)

test = Subscriber()
test.subscribe("London")
while True:
    print(test.socket.recv())

And here is the output :

Subscriber side :

$ python3 subscriber.py 
    Subscribed to parameter London
    b'London:288.15'

Publisher side :

$ python3 publisher.py 
    before wait
    Entered subscriptions coroutine
    New iteration of subscriptions loop
    Entered publish coroutine
    subscription request received for parameter London
    New iteration of subscriptions loop
    New iteration of publish loop
    bytearray(b'London:288.15')
    before write

And the program is stuck there.

As you can see, "before write" appears in the output and the message is sent, but "after write" doesn't appear. So, I figured that an exception was probably raised and caught somewhere in the self.stream.write(pack) call stack.

If I send a KeyboardInterrupt to the Publisher, here is what I get:

Traceback (most recent call last):
  File "publisher.py", line 73, in <module>
    loop.run_until_complete(test.main())
  File "/usr/lib/python3.4/asyncio/base_events.py", line 304, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 276, in run_forever
    self._run_once()
  File "/usr/lib/python3.4/asyncio/base_events.py", line 1136, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/lib/python3.4/selectors.py", line 432, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished coro=<publish() done, defined at publisher.py:43> exception=TypeError("'NoneType' object is not iterable",)>
Traceback (most recent call last):
  File "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step
    result = coro.send(value)
  File "publisher.py", line 66, in publish
    yield from self.stream.write(pack)
TypeError: 'NoneType' object is not iterable
Task was destroyed but it is pending!
task: <Task pending coro=<subscriptions() running at publisher.py:32> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>

So I guess my problem actually is this error: TypeError: 'NoneType' object is not iterable, but I have no clue what's causing it.

What is going wrong here?


Answer:

The issue is that you're trying to yield from the call to self.stream.write(), but stream.write isn't actually a coroutine. When you call yield from on an item, Python internally calls iter(item). In this case, the call to write() is returning None, so Python is trying to do iter(None) - hence the exception you see.

To fix it, you should just call write() like a normal function. If you want to actually wait until the write is flushed and sent to the reader, use yield from stream.drain() after you make the call to write():

print("before write")
self.stream.write(pack)
yield from self.stream.drain()
print("after write")

Also, to make sure that exception in publish get raised without needing to Ctrl+C, use asyncio.gather instead of asyncio.wait:

    yield from asyncio.gather(*tasks)

With asyncio.gather, any exception thrown by a task inside tasks will be re-raised.

Question:

I am writing simple producer/consumer program.

import zmq

@asyncio.coroutine
def receive_data(future,s):
        print("begin to recv sth from.....socket"
        my_data = s.recv()
        future.set_result(my_data)

@asyncio.coroutine
def producer(loop,q,s):
        while True:
                future = asyncio.Future()
                yield from receive_data(future,s)
                data = str(future.result())
                yield from q.put(data)
@asyncio.coroutine
def consumer(loop,q):
       while True:
          a = yield from q.get()
          print("i am get..."+str(a)+"..."+str(type(a)))  
loop = asyncio.get_event_loop()

c = zmq.Context()
s = c.socket(zmq.REP)
s.bind('tcp://127.0.0.1:5515')

q = asyncio.Queue()
tasks=[asyncio.Task(producer(loop,q,s)),asyncio.Task(comsumer(loop,q))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
s.close()

It appears the consumer has no chance to execute.

The sockets receive data every 500ms, so when yield from in receive_data function suspends the producer coroutine, the consumer coroutine will print info.

What could explain this?


Answer:

s.recv() is blocking call, so receive_data hungs until new ZMQ message arrives.

That blocks event loop and consumer has no chance to execute itself.

You can pass zmq.NOBLOCK flag to .recv and call asyncio.sleep(0) if no data available to give eventloop a chance to iterate over other ready tasks.

Or just use aiozmq library :)

Question:

I'm setting up a listener for Hyperledger Sawtooth events with a pyzmq dealer socket and the provided asyncio functionality. Currently futures are returned but only sometimes finished, even though messages are sent to the Socket.

Weirdly this works for the connection message (only when sleeping before it as shown below) but not for event messages. I implemented this already with JavaScript and it works without problems. It seems that the issue does not lie with Sawtooth but rather in pyzmq's implementation of asyncio functionality or in my code.

class EventListener:
    def __init__(self):
        ...
        ctx = Context.instance()
        self._socket = ctx.socket(zmq.DEALER)
        self._socket.connect("tcp://127.0.0.1:4004")

    async def subscribe(self):
        ...
        await self._socket.send_multipart([connection_msg])

    async def receive(self):
        # Necessary wait otherwise the future is never finished
        await asyncio.sleep(0.1)
        resp = await self._socket.recv_multipart()
        handle_response(resp)

    async def listen(self):
        while True:
            # here sleep is not helping
            # await asyncio.sleep(0.1)

            # follwing await is never finished
            resp = await self._socket.recv_multipart()
            handle_response(resp)
...
listener = listener.EventListener()
await asyncio.gather(
    listener.receive(), listener.subscribe())
await asyncio.create_task(listener.listen())
...

Debugging shows that the returned Future object is never changed from a pending to a finished state. So, is my code incorrect, do I need to await messages differently or is it possible that something is wrong with pyzmq's asyncio functionality? Also, why do I need to sleep in receive(), isn't that why we have asyncio?


Answer:

There are too many queries, this answer may not address all of them. Hope at least this will help others looking for a way to setup event listeners.

The Hyperledger Sawtooth python SDK provides option for clients to subscribe to the events. The SDK part of code that does what you're trying to do can be found at https://github.com/hyperledger/sawtooth-sdk-python/blob/master/sawtooth_sdk/messaging/stream.py

The example code to use the Hyperledger Sawtooth python SDK for event subscription can be found here https://github.com/danintel/sawtooth-cookiejar/blob/master/events/events_client.py

Question:

I run aiohttp application with Gunicorn behind nginx. In my application's initialization module I don't run the application using web.run_app(app) but just create an instance that will be imported by Gunicorn to run it in each worker Gunicorn creates. So Gunicorn creates a few worker processes, event loops within them, and then runs the application's request handler in those loops.

My aiohttp application has a collection of connected WebSockets (mobile application clients) that I want to notify on event occurred in any of application processes started by Gunicorn. And I want to notify all WebSockets that are connected to all application processes. Therefore I create some kind of upstream proxy using ZeroMQ and I want to subscribe to it using zmq.SUB socket from each application process.

...So basically I want to achieve something like this in each application worker:

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5555')

while True:
    event = socket.recv()
    for ws in app['websockets']:
        ws.send_bytes(event)
    # break before app shutdown. How?

How can I listen the ZeroMQ proxy within aiohttp application to forward messages to WebSockets?

Where can I put this code to run in background within event loop and how to run and shutdown it correctly within aiohttp application's life cycle?


UPDATE

I've already created an issue in aiohttp's GitHub repository describing the problem and proposing a possible solution. I'd highly appreciate an input here or there on matter of the problem described.


Answer:

Ok, the question and the discussion on this issue has led to the new functionality I've contributed to aiohttp, namely in version 1.0 we'll have an ability to register on_startup application signals using Application.on_startup() method.

Documentation. Working example on the master branch.

#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio

import aioredis
from aiohttp.web import Application, WebSocketResponse, run_app

async def websocket_handler(request):
    ws = WebSocketResponse()
    await ws.prepare(request)
    request.app['websockets'].append(ws)
    try:
        async for msg in ws:
            print(msg)
            await asyncio.sleep(1)
    finally:
        request.app['websockets'].remove(ws)
    return ws


async def on_shutdown(app):
    for ws in app['websockets']:
        await ws.close(code=999, message='Server shutdown')


async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
            print("message in {}: {}".format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        print('Cancel Redis listener: close connection...')
        await sub.unsubscribe(ch.name)
        await sub.quit()
        print('Redis connection closed.')


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    print('cleanup background tasks...')
    app['redis_listener'].cancel()
    await app['redis_listener']


async def init(loop):
    app = Application(loop=loop)
    app['websockets'] = []
    app.router.add_get('/news', websocket_handler)
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    app.on_shutdown.append(on_shutdown)
    return app

loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app)

Question:

I've just started with ZeroMQ and I'm trying to get a Hello World to work with PyZMQ and asyncio in Python 3.6. I'm trying to de-couple the functionality of a module with the pub/sub code, hence the following class setup:

Edit 1: Minimized example

Edit 2: Included solution, see answer down for how.

import asyncio
import zmq.asyncio
from zmq.asyncio import Context

# manages message flow between publishers and subscribers
class HelloWorldMessage:
    def __init__(self, url='127.0.0.1', port='5555'):
        self.url = "tcp://{}:{}".format(url, port)
        self.ctx = Context.instance()

        # activate publishers / subscribers
        asyncio.get_event_loop().run_until_complete(asyncio.wait([
            self.pub_hello_world(),
            self.sub_hello_world(),
        ]))

    # generates message "Hello World" and publish to topic 'world'
    async def pub_hello_world(self):
        pub = self.ctx.socket(zmq.PUB)
        pub.connect(self.url)

        # message contents
        msg = "Hello World"
        print(msg)

        # keep sending messages
        while True:
            # --MOVED-- slow down message publication
            await asyncio.sleep(1) 

            # publish message to topic 'world'
            # async always needs `send_multipart()`
            await pub.send_multipart([b'world', msg.encode('ascii')])  # WRONG: bytes(msg)

    # processes message "Hello World" from topic 'world'
    async def sub_hello_world(self):
        sub = self.ctx.socket(zmq.SUB)
        sub.bind(self.url)
        sub.setsockopt(zmq.SUBSCRIBE, b'world')

        # keep listening to all published message on topic 'world'
        while True:
            msg = await sub.recv_multipart()
            # ERROR: WAITS FOREVER
            print('received: ', msg)

if __name__ == '__main__':
    HelloWorldMessage()
Problem

With the above code only 1 Hello World is printed and then waits forever. If I press ctrl+c, I get the following error:

python helloworld_pubsub.py

Hello World
^CTraceback (most recent call last):
  File "helloworld_pubsub_stackoverflow.py", line 64, in <module>
    HelloWorldMessage()
  File "helloworld_pubsub_stackoverflow.py", line 27, in __init__
    self.sub_hello_world(),
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/*path*/zeromq/lib/python3.6/asyncio/base_events.py", line 1395, in _run_once
    event_list = self._selector.select(timeout)
  File "/*path*/zeromq/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

Versions: libzmq: 4.2.3, pyzmq: 17.0.0, Ubuntu 16.04

Any insights are appreciated.


Answer:

De-coupling for an OOP separation of concerns is fine, yetlet's also spend some care on debugging the code:

1) ZeroMQ PUB/SUB Scalable Formal Communication Archetype is known for years to require some time before PUB/SUB-s get indeed ready so as to broadcast / accept messages. Thus one ought prefer to setup the infrastructure best inside the .__init__() and not right before SUB-s are supposed to already receive some payload(s)

In my view, this would be a safer design approach:

class HelloWorldMessage:
    """                                                       __doc__
    [DEF-ME]
    [DOC-ME]

    USAGE:     with HelloWorldMessage() as aContextManagerFUSEd_class_INSTANCE:
                    # may              use aContextManagerFUSEd_class_INSTANCE
                    # and shall safely
                    #     gracefully terminate locally spawned ZeroMQ resources
    PARAMETERS:
    RETURNS:   
    THROWS:    
    EXAMPLE:   
    REF.s:

    [TEST-ME]
    [PERF-ME]
    [PUB-ME]
    """
    def __init__( self, url  = '127.0.0.1',
                        port = '5555'
                        ):

        self._url = "tcp://{}:{}".format( url, port )
        #---------------------------------------------------- CONTEXT:
        self._ctx = Context.instance();                       print( "INF: zmq.asyncio.Context() set" if ( zmq.ZMQError() == 0 ) else "ERR[1]: {0:}".format( zmq.ZMQError() ) )
        #---------------------------------------------------- SUB:
        self._sub = self._ctx.socket(zmq.SUB );               print( "INF: zmq.SUB set"               if ( zmq.ZMQError() == 0 ) else "ERR[2]: {0:}".format( zmq.ZMQError() ) )
        self._sub.bind(                  self._url );         print( "INF: zmq.SUB.bind() done"       if ( zmq.ZMQError() == 0 ) else "ERR[3]: {0:}".format( zmq.ZMQError() ) )
        self._sub.setsockopt(        zmq.LINGER, 1 );         print( "INF: zmq.SUB LINGER set"        if ( zmq.ZMQError() == 0 ) else "ERR[4]: {0:}".format( zmq.ZMQError() ) )
        self._sub.setsockopt(        zmq.SUBSCRIBE, b'world');print( "INF: zmq.SUB subscribed"        if ( zmq.ZMQError() == 0 ) else "ERR[5]: {0:}".format( zmq.ZMQError() ) )
        #---------------------------------------------------- PUB:
        self._pub = self._ctx.socket(zmq.PUB );               print( "INF: zmq.PUB set"               if ( zmq.ZMQError() == 0 ) else "ERR[6]: {0:}".format( zmq.ZMQError() ) )
        self._pub.setsockopt(        zmq.LINGER, 1 );         print( "INF: zmq.PUB LINGER set"        if ( zmq.ZMQError() == 0 ) else "ERR[7]: {0:}".format( zmq.ZMQError() ) )
        self._pub.connect(               self._url );         print( "INF: zmq.PUB.connect() done"    if ( zmq.ZMQError() == 0 ) else "ERR[8]: {0:}".format( zmq.ZMQError() ) )
        #----------------------------------------------------
        ...
    def __enter__( self ):
        #---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __enter__()-auto-METHOD
        return self

    def __exit__( self, exc_type, exc_value, traceback ):
        #---------------------------------------------------- with <class> as <symbol>: CONTEXT MANAGER __exit__()-auto-METHOD
        self.try_to_close( self._pub );
        self.try_to_close( self._sub );
        pass;         self._ctx.term()
        return

    ################################################################
    #
    # A       PUB-SENDER ------------------------------------
    async def pub_hello_world( self ):

          self._pObj = PubHelloWorld();                       print( "INF: pObj set on PUB-side"      if ( self._pObj.msg_pub()  # instance-fuse(d)
                                                                                                         ==   "Hello World"    ) else "ERR[9]: {0:}".format( "Hello World" ) )
          try:
               while True:                                    # keep sending messages
                   self._sMsg = self._pObj.msg_pub();         print( "INF: pObj.msg_pub() called"     if ( self._sMsg  != None ) else "ERR[A]: {0:}".format( "msg == ?"    ) )
                   pass;                                      print( self._sMsg )
                   # publish message to topic 'world'
                   # async always needs `send_multipart()`
                   await self._pub.send_multipart( [ b'world',
                                                       bytes( self._sMsg )
                                                       ]
                                                  );          print( "INF: await .send_multipart()"   if ( zmq.ZMQError() == 0 ) else "ERR[B]: {0:}".format( zmq.ZMQError() ) )
                   # slow down message publication
                   await asyncio.sleep( 1 );                  print( "NOP: await .sleep( 1 )"         if ( zmq.ZMQError() == 0 ) else "ERR[C]: {0:}".format( zmq.ZMQError() ) )
          except:
              pass;                                           print( "EXC: thrown on PUB side"        if ( zmq.ZMQError() == 0 ) else "ERR[D]: {0:}".format( zmq.ZMQError() ) )

          finally:
              self._pub.close();                              print( "FIN: PUB.close()-d"             if ( zmq.ZMQError() == 0 ) else "ERR[E]: {0:}".format( zmq.ZMQError() ) )

    ################################################################
    #
    # A       SUB-RECEIVER ---------------------------------
    async def sub_hello_world( self ):

          self._sObj = SubHelloWorld();                       print( "INF: sObj set on SUB-side"      if (  None                 # instance-fuse(d)
                                                                                                         == self._sObj.msg_receive("?")
                                                                                                            )                    else "ERR[F]: {0:}".format( "?"            ) )
          try:
               while True:                                   # keep listening to all published message on topic 'world'
                     pass;                                    print( "INF: await .recv_multipart() about to be called now:" )
                     self._rMsg = await self._sub.recv_multipart()
                     pass;                                    print( "INF: await .recv_multipart()"   if ( zmq.ZMQError() == 0 ) else "ERR[G]: {0:}".format( zmq.ZMQError() ) )
                     pass;                                    print( 'ACK: received: ', self._rMsg )
                     self._sObj.msg_receive( self._rMsg );    print( 'ACK: .msg_receive()-printed.' )
          except:
              pass;                                           print( "EXC: thrown on SUB side"        if ( zmq.ZMQError() == 0 ) else "ERR[H]: {0:}".format( zmq.ZMQError() ) )

          finally:
              self._sub.close();                              print( "FIN: SUB.close()-d"             if ( zmq.ZMQError() == 0 ) else "ERR[I]: {0:}".format( zmq.ZMQError() ) )

    # ---------close()---------------------------------------
    def try_to_close( self, aSocketINSTANCE ):

        try:
            aSocketINSTANCE.close();

        except:
            pass;

        return

2) Best used using a with HelloworldMessage() as ... : context-manager

Question:

I'm playing for the first time with asyncio in python and trying to combine it with ZMQ.

Basically my issue is that I have a REP/REQ system, in an async def with a function I need to await. how the value is not updated. Here's a snippet of the code to illustrate that:

#Declaring the zmq context
context = zmq_asyncio.Context()
REP_server_django = context.socket(zmq.REP)
REP_server_django.bind("tcp://*:5558")

I send this object to a class and get it back in this function

async def readsonar(self, trigger_pin, REP_server_django):
        i= 0
        while True:

            ping_from_view = await REP_server_django.recv()  # line.1
            value = await self.board.sonar_read(trigger_pin) # line.2
            print(value)                                     # line.3
            json_data = json.dumps(value)                    # line.4
            #json_data = json.dumps(i)                       # line.4bis
            REP_server_django.send(json_data.encode())       # line.5
            i+=1                                             # line.6
            await asyncio.sleep(1/1000)                      # line.7

the sonar_read, is using pymata_express to read an ultrasonic sensor. If I comment line.2 and line.4 I get the right value for i. If I comment line.1 and line.5 the print(value) prints the correct value from sonar_read. However, when I run it as shown here, the value is not updated.

Am I missing something?


EDIT : Edited a type regarding the line comments. What I meant is that if I only read the sonar and print the value. It works fine. If I only .recv() and .send(json.dumps(i).encode()), it works. But if I try to send the value from the sonar. It locks to a given value which is not updated


EDIT2 : (answer to Alan Yorinks): here is the MWE, it considers what you sent regarding the declaration of zmq in the class. It is taken from the pymata_express example concurrent_tasks.py

To reproduce the error, run these two scripts in two different terminals. You will need an arduino board with Frimata_express installed. If all runs well, PART A. should only spit out the same value on the mve_req.py end. You may edit the diffrent blocks (PARTS A, B or C) to see the behaviour.

mve_rep.py

#ADAPTED FROM PYMATA EXPRESS EXAMPLE CONCURRENTTAKS
#https://github.com/MrYsLab/pymata-express/blob/master/examples/concurrent_tasks.py
import asyncio
import zmq
import json
import zmq.asyncio as zmq_asyncio
from pymata_express.pymata_express import PymataExpress


class ConcurrentTasks:

    def __init__(self, board):


        self.loop = board.get_event_loop()
        self.board = board

        self.ctxsync = zmq.Context()
        self.context = zmq.asyncio.Context()
        self.rep = self.context.socket(zmq.REP)
        self.rep.bind("tcp://*:5558")

        self.trigger_pin = 53
        self.echo_pin = 51

        loop.run_until_complete(self.async_init_and_run())

    async def readsonar(self):
        i = 0
        while True:


            #PART. A. WHAT I HOPE COULD WORK
            rep_recv = await self.rep.recv()                       # line.1
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            # json_data = json.dumps(i)                            # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7


            '''
            #PART. B. WORKS FINE IN UPDATING THE SONAR_RAED VALUE AND PRINTING IT
            value = await self.board.sonar_read(self.trigger_pin)  # line.2
            print(value)                                           # line.3
            json_data = json.dumps(value)                          # line.4
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''

            '''
            #PART. C. WORKS FINE IN SENDING THE i VALUE OVER ZMQ
            rep_recv = await self.rep.recv()                       # line.1
            json_data = json.dumps(i)                              # line.4bis
            await self.rep.send(json_data.encode())                # line.5
            i += 1                                                 # line.6
            await asyncio.sleep(1 / 1000)                          # line.7
            '''



    async def async_init_and_run(self):

        await self.board.set_pin_mode_sonar(self.trigger_pin, self.echo_pin)

        readsonar = asyncio.create_task(self.readsonar())
        await readsonar

        # OTHER CREATED_TASK GO HERE, (removed them in the MVE, but they work fine)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    my_board = PymataExpress()
    try:
        ConcurrentTasks(my_board)
    except (KeyboardInterrupt, RuntimeError):
        loop.run_until_complete(my_board.shutdown())
        print('goodbye')
    finally:
        loop.close()

mve_req.py

import zmq
import time
import json

def start_zmq():
    context = zmq.Context()
    REQ_django  = context.socket(zmq.REQ)
    REQ_django.connect("tcp://localhost:5558")

    return REQ_django, context

def get_sonar(REQ_django):
    REQ_django.send(b"server_django")
    ping_from_server_django = REQ_django.recv()
    return ping_from_server_django.decode()

if __name__ == '__main__':

    data = {"sensors":{}}

    REQ_django, context = start_zmq()
    while REQ_django:

            data['sensors']['sonar'] = get_sonar(REQ_django)
            json_data = json.dumps(data)
            print(data)

            #DO OTHER WORK
            time.sleep(1)

    REQ_django.close()
    context.term()

Answer:

In full disclosure, I am the author of pymata-express and python-banyan. The OP requested that I post this solution, so this is not meant to be a shameless plug.

I have been developing with asyncio since it was first introduced in Python 3. When asyncio code works, asyncio (IMHO) can simplify concurrency and the code. However, when things go awry, it can be frustrating to debug and understand the cause of the issues.

I apologize ahead of time, since this may be a little lengthy, but I need to provide some background information so that the example will not seem like some random bit of code.

The python-banyan framework was developed to provide an alternative to threading, multi-processing, and asyncio. Simply put, a Banyan application consists of small targeted executables that communicate with one another using protocol messages that are shared over a LAN. At its core it uses Zeromq. It was not designed to have traffic move over the WAN, but to use a LAN as a "software backplane." In some ways, Banyan is similar to MQTT, but it is much faster when used within a LAN. It does have the capability to connect to an MQTT network if that is desireable.

Part of Banyan is a concept called OneGPIO. It is a protocol messaging specification that abstracts GPIO functionality to be independent of any hardware implementation. To implement the hardware specifics, specialized Banyan components, called Banyan Hardware Gateways were developed. There are gateways available for the Raspberry Pi, Arduino, ESP-8266 and Adafruit Crickit Hat. A GPIO application publishes the generic OneGPIO messages that any or all of the gateways can elect to receive. To move from one hardware platform to another, the hardware associated gateway is launched, and without modification, the control component (which is the code shown below) is launched. To go from one hardware platform to another, there are no code modifications necessary for any of the components, neither the control component nor the gateway is modified. Variables, such as pin numbers may be specificied through command line options when launching the control component. For the Arduino Gateway, pymata-express is used to control the GPIO of the Arduino. Pymata-express is an asyncio implementation of a StandardFirmata client. The thing to note that the code below is not asyncio. The Banyan framework allows one to develop using the tools that fit the problem, yet allow decoupling of parts of the solution, and in this case, the application allows the mixing the of asyncio with non-asyncio without any of the headaches normally encountered in doing so.

In the code provided, all the code below the class definition is used to provide support for command-line configuration options.

import argparse
import signal
import sys
import threading
import time

from python_banyan.banyan_base import BanyanBase


class HCSR04(BanyanBase, threading.Thread):
    def __init__(self, **kwargs):
        """
        kwargs contains the following parameters
        :param back_plane_ip_address: If none, the local IP address is used
        :param process_name: HCSR04
        :param publisher_port: publishing port
        :param subscriber_port: subscriber port
        :param loop_time: receive loop idle time
        :param trigger_pin: GPIO trigger pin number
        :param echo_pin: GPIO echo pin number
        """

        self.back_plane_ip_address = kwargs['back_plane_ip_address'],
        self.process_name = kwargs['process_name']
        self.publisher_port = kwargs['publisher_port']
        self.subscriber_port = kwargs['subscriber_port'],
        self.loop_time = kwargs['loop_time']
        self.trigger_pin = kwargs['trigger_pin']
        self.echo_pin = kwargs['echo_pin']
        self.poll_interval = kwargs['poll_interval']

        self.last_distance_value = 0

        # initialize the base class
        super(HCSR04, self).__init__(back_plane_ip_address=kwargs['back_plane_ip_address'],
                                     subscriber_port=kwargs['subscriber_port'],
                                     publisher_port=kwargs['publisher_port'],
                                     process_name=kwargs['process_name'],
                                     loop_time=kwargs['loop_time'])

        threading.Thread.__init__(self)
        self.daemon = True

        self.lock = threading.Lock()

        # subscribe to receive messages from arduino gateway
        self.set_subscriber_topic('from_arduino_gateway')

        # enable hc-sr04 in arduino gateway
        payload = {'command': 'set_mode_sonar', 'trigger_pin': self.trigger_pin,
                   'echo_pin': self.echo_pin}
        self.publish_payload(payload, 'to_arduino_gateway')

        # start the thread
        self.start()

        try:
            self.receive_loop()
        except KeyboardInterrupt:
            self.clean_up()
            sys.exit(0)

    def incoming_message_processing(self, topic, payload):
        print(topic, payload)
        with self.lock:
            self.last_distance_value = payload['value']

    def run(self):
        while True:
            with self.lock:
                distance = self.last_distance_value
            payload = {'distance': distance}
            topic = 'distance_poll'
            self.publish_payload(payload, topic)
            time.sleep(self.poll_interval)


def hcsr04():
    parser = argparse.ArgumentParser()
    # allow user to bypass the IP address auto-discovery.
    # This is necessary if the component resides on a computer
    # other than the computing running the backplane.
    parser.add_argument("-b", dest="back_plane_ip_address", default="None",
                        help="None or IP address used by Back Plane")
    parser.add_argument("-i", dest="poll_interval", default=1.0,
                        help="Distance polling interval")
    parser.add_argument("-n", dest="process_name", default="HC-SRO4 Demo",
                        help="Set process name in banner")
    parser.add_argument("-p", dest="publisher_port", default="43124",
                        help="Publisher IP port")
    parser.add_argument("-s", dest="subscriber_port", default="43125",
                        help="Subscriber IP port")
    parser.add_argument("-t", dest="loop_time", default=".1",
                        help="Event Loop Timer in seconds")
    parser.add_argument("-x", dest="trigger_pin", default="12",
                        help="Trigger GPIO pin number")
    parser.add_argument("-y", dest="echo_pin", default="13",
                        help="Echo GPIO pin number")

    args = parser.parse_args()

    if args.back_plane_ip_address == 'None':
        args.back_plane_ip_address = None
    kw_options = {'back_plane_ip_address': args.back_plane_ip_address,
                  'publisher_port': args.publisher_port,
                  'subscriber_port': args.subscriber_port,
                  'process_name': args.process_name,
                  'loop_time': float(args.loop_time),
                  'trigger_pin': int(args.trigger_pin),
                  'echo_pin': int(args.echo_pin),
                  'poll_interval': int(args.poll_interval)
                  }

    # replace with the name of your class
    HCSR04(**kw_options)


# signal handler function called when Control-C occurs
def signal_handler(sig, frame):
    print('Exiting Through Signal Handler')
    raise KeyboardInterrupt


# listen for SIGINT
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

if __name__ == '__main__':
    hcsr04()

Question:

N-proxy-N Pub-Sub

Similar to the question N to N async pattern in ZeroMQ?, but which unfortunately never received an answer with working code.

I'm trying to implement Pub-Sub network as described in the guide: http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem (a small message broker in the style of N-proxy-N). Unfortunately, the guide doesn't provide any code examples.

I've tried to implement an Hello World example using PyZMQ, I think I'm close, but I'm facing some errors I don't know how to handle. Sorry for the use of asyncio (I'm more comfortable with this then threads).

Code
"""Example using zmq to create a PubSub node_topic similar to a ROS topic"""
# Copyright (c) Stef van der Struijk <stefstruijk@protonmail.ch>.
# This example is in the public domain (CC-0)
# http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem

import asyncio
import zmq.asyncio
from zmq.asyncio import Context
import traceback
import logging

# N-proxy-M pattern: a subscriber which passes messages through a proxy through a publisher
class PubSubTopic:
    def __init__(self, address='127.0.0.1', port1='5566', port2='5567'):
        # get ZeroMQ version
        print("Current libzmq version is %s" % zmq.zmq_version())
        print("Current  pyzmq version is %s" % zmq.pyzmq_version())

        self.context = Context.instance()
        # 2 sockets, because we can only bind once to a socket (as opposed to connect)
        self.url1 = "tcp://{}:{}".format(address, port1)
        self.url2 = "tcp://{}:{}".format(address, port2)

        # start proxy, pubs and subs async; demonstration purpose only, probably better in separate threads
        asyncio.get_event_loop().run_until_complete(asyncio.wait([
            self.xpub_xsub_proxy(),  # commented out for different error
            self.pub_hello_world(),
            self.pub_hello_world(lang='jp'),
            self.sub_hello_world(),
            self.sub_hello_world(lang='jp'),
        ]))

    # N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers
    async def xpub_xsub_proxy(self):
        # no traceback with zmq.asyncio and no try statement
        try:
            print("Init proxy")

            # Socket subscribing to publishers
            frontend_pubs = self.context.socket(zmq.XSUB)
            frontend_pubs.bind(self.url1)

            # Socket publishing to subscribers
            backend_subs = self.context.socket(zmq.XPUB)
            backend_subs.bind(self.url2)

            print("Try: Proxy... CONNECT!")
            zmq.proxy(frontend_pubs, backend_subs)
            print("CONNECT successful!")

        except Exception as e:
            print("Error with proxy :(")
            # print(e)
            logging.error(traceback.format_exc())
            print()

    # test case: 2 pubs to 1 topic
    async def pub_hello_world(self, lang='en'):
        # no traceback with zmq.asyncio and no try statement
        try:
            print("Init pub {}".format(lang))

            # connect, because many publishers - 1 subscriber
            pub = self.context.socket(zmq.PUB)
            pub.connect(self.url1)

            if lang == 'en':
                message = "Hello World"
                sleep = 1
            else:
                message = "Hello Sekai"  # Japanese
                sleep = 2

            # wait proxy and subs to b ready
            await asyncio.sleep(.5)

            # keep publishing "Hello World" / "Hello Sekai" messages
            print("Pub {}: Going to pub messages!".format(lang))
            while True:
                # publish message to topic 'world'
                # multipart: topic, message; async always needs `send_multipart()`?
                await pub.send_multipart([lang.encode('ascii'), message.encode('ascii')])
                print("Pub {}: Have send msg".format(lang))

                # slow down message publication
                await asyncio.sleep(sleep)

        except Exception as e:
            print("Error with pub {}".format(lang))
            # print(e)
            logging.error(traceback.format_exc())
            print()

    # test case: 2 subs to 1 topic
    async def sub_hello_world(self, lang='en'):
        # no traceback with zmq.asyncio and no try statement
        try:
            print("Init sub {}".format(lang))

            # connect, because many subscribers - 1 (proxy) pub
            sub = self.context.socket(zmq.SUB)
            sub.connect(self.url2)
            # subscribe to topic 'en' or 'jp'
            sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii'))

            # wait proxy to be ready; necessary?
            await asyncio.sleep(.2)

            # keep listening to all published message, filtered on topic
            print("Sub {}: Going to wait for messages!".format(lang))
            while True:
                msg_received = await sub.recv_multipart()
                print("sub {}: {}".format(lang, msg_received))

        except Exception as e:
            print("Error with sub {}".format(lang))
            # print(e)
            logging.error(traceback.format_exc())
            print()


if __name__ == '__main__':
    PubSubTopic()
Errors
Proxy error

When I don't comment out the proxy function, I get the following traceback

python pub_sub_topic.py 
Current libzmq version is 4.2.2
Current  pyzmq version is 16.0.2
Init proxy
Try: Proxy... CONNECT!
^CTraceback (most recent call last):
  File "pub_sub_topic.py", line 139, in <module>
    PubSubTopic()
  File "pub_sub_topic.py", line 43, in __init__
    self.sub_hello_world(lang='jp'),
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1426, in _run_once
    handle._run()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/events.py", line 127, in _run
    self._callback(*self._args)
  File "pub_sub_topic.py", line 62, in xpub_xsub_proxy
    zmq.proxy(frontend_pubs, backend_subs)
  File "zmq/backend/cython/_device.pyx", line 95, in zmq.backend.cython._device.proxy (zmq/backend/cython/_device.c:1824)
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/_device.c:1991)
KeyboardInterrupt
Subscriber error

If I do comment out the proxy function (# self.xpub_xsub_proxy(),), I get the following traceback

python pub_sub_topic.py 
Current libzmq version is 4.2.2
Current  pyzmq version is 16.0.2
Init sub en
Init sub jp
Init pub en
Init pub jp
Sub en: Going to wait for messages!
Error with sub en
ERROR:root:Traceback (most recent call last):
  File "pub_sub_topic.py", line 128, in sub_hello_world
    msg_received = await sub.recv_multipart()
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 170, in recv_multipart
    dict(flags=flags, copy=copy, track=track)
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 321, in _add_recv_event
    self._add_io_state(self._READ)
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 294, in _add_io_state
    self.io_loop.add_reader(self, self._handle_recv)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 337, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
    key = self._selector.get_key(fd)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
    return mapping[fileobj]
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
    fd = self._selector._fileobj_lookup(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
    "{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>


Exception ignored in: <bound method Socket.__del__ of <zmq.asyncio.Socket object at 0x7fa90a4a7528>>
Traceback (most recent call last):
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/sugar/socket.py", line 70, in __del__
    self.close()
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 160, in close
    self._clear_io_state()
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 316, in _clear_io_state
    self._drop_io_state(self._state)
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 303, in _drop_io_state
    self.io_loop.remove_reader(self)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 342, in remove_reader
    return self._remove_reader(fd)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 279, in _remove_reader
    key = self._selector.get_key(fd)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
    return mapping[fileobj]
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
    fd = self._selector._fileobj_lookup(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
    "{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>
Sub jp: Going to wait for messages!

*snip* Same error as 'Sub en' *snip*

Pub en: Going to pub messages!
Pub en: Have send msg
Pub jp: Going to pub messages!
Pub jp: Have send msg
Pub en: Have send msg
Pub jp: Have send msg
Pub en: Have send msg
^CTraceback (most recent call last):
  File "pub_sub_topic.py", line 139, in <module>
    PubSubTopic()
  File "pub_sub_topic.py", line 43, in __init__
    self.sub_hello_world(lang='jp'),
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1390, in _run_once
    event_list = self._selector.select(timeout)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
System info
  • Ubuntu 16.04
  • Python 3.6 (through Anaconda)
  • libzmq version 4.2.2
  • pyzmq version 16.0.2

Answer:

Absolutely you should not comment proxy function. The problem is because zmq.proxy function block forever and you ran it with "run_until_complete" event loop. You should change event loop execution type to run_forever.

Question:

I'm making a ZeroMQ server in pyzmq using asyncio. I'm trying to gracefully handle stopping the server, but there's very little documentation on the async module and there doesn't seem to be a simple way to handle stopping the current poll/await. Stopping the loop in the .stop method doesn't do much and won't actually exit.

import zmq
import zmq.asyncio
import asyncio

class ZMQHandler():
    def __init__(self):
        self.loop = zmq.asyncio.ZMQEventLoop()
        asyncio.set_event_loop(self.loop)
        self.context = zmq.asyncio.Context()
        self.socket = self.context.socket(zmq.DEALER)
        self.socket.bind('tcp://127.0.0.1:5000')
        self.socket.linger = -1


    def start(self):
        asyncio.ensure_future(self.listen())
        self.loop.run_forever()

    def stop(self):
        print('Stopping')
        self.loop.stop()

    async def listen(self):
        self.raw = await self.socket.recv()
        asyncio.ensure_future(self.listen())

Here's some example code that would start this:

daemon = ZMQHandler()

def signal_handler(num, frame):
    daemon.stop()

signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)

daemon.start()

How do I gracefully stop this when it's running? When I call self.socket.close(), I get the error zmq.error.ZMQError: Socket operation on non-socket, and if I call self.context.destroy() it basically complains that the sockets weren't terminated cleanly with ETERM.


Answer:

It ended up being a bug in the implementation of pyzmq. The bug was fixed and now calling loop.stop() works as intended.

Question:

I'm trying to implement asynchronous client and server using pyzmq and asyncio in python3.5. I've used the asyncio libraries provided by zmq. Below is my code for client(requester.py) and server(responder.py). My requirement is to use only REQ and REP zmq sockets to achieve async client-server.

requester.py:

import asyncio
import zmq
import zmq.asyncio

async def receive():
        message = await socket.recv()
        print("Received reply ", "[", message, "]")
        return message

async def send(i):
        print("Sending request ", i,"...")
        request = "Hello:" + str(i)
        await socket.send(request.encode('utf-8'))
        print("sent:",i)

async def main_loop_num(i):
        await send(i)
        #  Get the reply.
        message = await receive()
        print("Message :", message)

async def main():
        await asyncio.gather(*(main_loop_num(i) for i in range(1,10)))                

port = 5556
context = zmq.asyncio.Context.instance()

socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:%d" % port)
asyncio.get_event_loop().run_until_complete(asyncio.wait([main()]))

responder.py:

import asyncio
import zmq
import zmq.asyncio

async def receive():
    message = await socket.recv()
    print("Received message:", message)
    await asyncio.sleep(10)
    print("Sleep complete")
    return message

async def main_loop():
    while True:
        message = await receive()
        print("back to main loop")
        await socket.send(("World from %d" % port).encode('utf-8'))
        print("sent back")

port = 5556
context = zmq.asyncio.Context.instance()

socket = context.socket(zmq.REP)
socket.bind("tcp://*:%d" % port)
asyncio.get_event_loop().run_until_complete(asyncio.wait([main_loop()]))

The output that I'm getting is:

requester.py:

Sending request  5 ...
sent: 5
Sending request  6 ...
Sending request  1 ...
Sending request  7 ...
Sending request  2 ...
Sending request  8 ...
Sending request  3 ...
Sending request  9 ...
Sending request  4 ...

responder.py:

Received message: b'Hello:5'
Sleep complete
back to main loop
sent back

From the output, I assume that the requester has sent multiple requests, but only the first one has reached the responder. Also, the response sent by responder for the first request has not even reached back to the requester. Why does this happen? I have used async methods everywhere possible, still the send() and recv() methods are not behaving asynchronously. Is it possible to make async req-rep without using any other sockets like router, dealer, etc?


Answer:

ZMQs REQ-REP sockets expect a strict order of one request - one reply - one request - one reply - ...

your requester.py starts all 10 requests in parallel:

await asyncio.gather(*(main_loop_num(i) for i in range(1,10)))

when sending the second request ZMQ complains about this:

zmq.error.ZMQError: Operation cannot be accomplished in current state

Try to change your main function to send one request at a time:

async def main():
    for i in range(1, 10):
        await main_loop_num(i)

If you need to send several requests in parallel then you can't use a REQ-REP socket pair but for example a DEALER-REP socket pair.