Hot questions for Using ZeroMQ in zerorpc

Question:

As you may know, ZeroRPC documentation is sparse. I can't get Streaming between a Python server and a Node client to work.

Here is the Python method:

@zerorpc.stream
def PublishWhaterver(self, some_args):
    yield "love"
    yield "stack"
    yield "overflow"

Here is the Node call:

export const tryStream = () => {
      connectedZerorpcClient.invoke('PublishWhatever', (error, res, more) => {
      console.log('STREAM', res, more);
  });
};

This code will log "STREAM love", and then do nothing.

So here are my questions:

  • In the Python server code, am I supposed to call PublishWhatever with relevant args so that it yield additionnal values ?
  • In the Node client, should I call some recursive function when there is more data ?

What I am trying to implement is a Pub/Sub system but right now implementation seems to only exists for a Python server and a Python client, there are no Node example.

The example on the main page and tests are not relevant either, it shows how to stream an array that already exists when the invoke method is called.Here the messages are generated during some heavy computations, I want the server to be able to tell the client "here, some data are ready" and never disconnect.


Answer:

Well, ZeroRPC actively promotes, that it is using its own python implementation code as a self-documentation how things work. In other words, no one has spent such additional efforts needed so as to publish a user-focused, the less a learning-process focused documentation.

Anyway, try to obey the few "visible" statements from the ZeroRPC description.

@zerorpc.stream
def PublishWhaterver(self, some_args):
    yield ( "love", "stack", "overflow", ) # one, tuple-wrapped result-container

Question:

I have a little program, which does some calculations in background, when I call it through zerorpc module in python 2.7.

Here is my code:

is_busy = False

class Server(object):
   def calculateSomeStuff(self):
       global is_busy

        if (is_busy):
            return 'I am busy!'

        is_busy = True

        # calculate some stuff

        is_busy = False
        print 'Done!'
        return 

    def getIsBusy(self):
        return is_busy

s = zerorpc.Server(Server())
s.bind("tcp://0.0.0.0:66666")
s.run()

What should I change to make this program returning is_busy when I call .getIsBusy() method, after .calculateSomeStuff() has started doing it's job?

As I know, there is no way to make it asynchronous in python 2.


Answer:

You need multi-threading for real concurrency and exploit more than one CPU core if this is what you are after. See the Python threading module, GIL-lock details & possible workarounds and literature.

If you want a cooperative solution, read on.

zerorpc uses gevent for asynchronous input/output. With gevent you write coroutines (also called greenlet or userland threads) which are all running cooperatively on a single thread. The thread in which the gevent input output loop is running. The gevent ioloop takes care of resuming coroutines waiting for some I/O event.

The key here is the word cooperative. Compare that to threads running on a single CPU/core machine. Effectively there is nothing concurrent,but the operating system will preempt ( verb: take action in order to prevent (an anticipated event) from happening ) a running thread to execute the next on and so on so that every threads gets a fair chance of moving forward.

This happens fast enough so that it feels like all threads are running at the same time.

If you write your code cooperatively with the gevent input/output loop, you can achieve the same effect by being careful of calling gevent.sleep(0) often enough to give a chance for the gevent ioloop to run other coroutines.

It's literally cooperative multithrading. I've heard it was like that in Windows 2 or something.

So, in your example, in the heavy computation part, you likely have some loop going on. Make sure to call gevent.sleep(0) a couple times per second and you will have the illusion of multi-threading.

I hope my answer wasn't too confusing.

Question:

I have problem with launching zeroRPC server in python. I did it according to official example, but when I call run() method it works in endless loop, so my program can't continue after launching this server. I tried to run it in new thread but I got following exception:

LoopExit: ('This operation would block forever', <Hub at 0x7f7a0c8f37d0 epoll pending=0 ref=0 fileno=19>)

I really don't know how to fix it. Have any ideas ?


Answer:

In short, you cannot use os threads with zerorpc.

Longer answer: zerorpc-python uses gevent for IO. This means your project MUST use gevent and be compatible with it. Native OS threads and gevent coroutines (also called greenlet, green threads etc) are not really friends.

There is a native threadpool option available in gevent (http://www.gevent.org/gevent.threadpool.html).

You cannot spawn a native OS thread and run gevent coroutines in there (including zerorpc).

If all you are doing works with gevent coroutines, then instead of running the run() in a native thread, run it in a gevent coroutine/greenlet/greenthread like so:

# starts the server in its own greenlet
gevent.spawn(myserver.run)
# zerorpc will spawn many more greenlet as needed.
# they all need to run cooperatively

# here we are continuing on the main greenlet.
# as a single greenlet can execute at a time, we must never block
# for too long. Using gevent IOs will cooperatively yield for example.
# Calling gevent.sleep() will yield as well.
while True:
  gevent.sleep(1)

Note: in case when gevent is not an option, a solution would be to implement a version of zerorpc-python that does not use gevent and implements its IO outside of Python, but this has interesting complication, and its not happening soon.

Question:

Server

Suppose I have the following zerorpc server

#server side
import zerorpc

class API():

    def long_running_task(self):
        print('1 - started long_running_task')
        #for instance a long running SQL query
        zerorpc.gevent.sleep(10)
        print('2 - finished long_running_task')

    def other_task(self):
        print('1 - started other_task')
        pass
        print('2 - finished other_task')

s = zerorpc.Server(API())
s.bind("tcp://0.0.0.0:4444")
zerorpc.gevent.spawn(s.run)
while True:
    zerorpc.gevent.sleep(10)
Client

and a client app which sends several requests simultanously

import zerorpc

client = zerorpc.Client()
client.connect("tcp://127.0.0.1:4444")

client.long_running_task(async_=True)
client.other_task(async_=True)

client.close()

The problem is that while the long_running_task is being executed the other_task doesn't start running.

The desired output is:

1 - started long_running_task
1 - started other_task
2 - finished other_task
2 - finished long_running_task

instead of

1 - started long_running_task
2 - finished long_running_task
1 - started other_task
2 - finished other_task

Answer:

There are a few mistakes here

  1. async_ should be async. This ensures client method returns immediately
  2. ZeroRPC works with gevent. So you need to start the server with gevent.spawn(s.run). Also to keep your main greenlet running.
zerorpc.gevent.spawn(s.run)
while True:
     zerorpc.gevent.sleep(10)
  1. above 2 is not enough. In the API() methods you need to be co-operative too. Instead of time.sleep() use gevent.sleep().