Hot questions for Using ZeroMQ in python multiprocessing

Top 10 C/C++ Open Source / ZeroMQ / python multiprocessing

Question:

I have an agent-based model, where several agents are started by a central process and communicate via another central process. Every agent and the communication process communicate via zmq. However, when I start more than 100 agents standard_out sends:

Invalid argument (src/stream_engine.cpp:143) Too many open files (src/ipc_listener.cpp:292)

and Mac Os prompts a problem report :

Python quit unexpectedly while using the libzmq.5.dylib plug-in.

The problem appears to me that too many contexts are opened. But how can I avoid this with multiprocessing?

I attach part of the code below:

class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process):
    def __init__(self, idn, group, _addresses, trade_logging):
        multiprocessing.Process.__init__(self)
        ....

    def run(self):
        self.context = zmq.Context()
        self.commands = self.context.socket(zmq.SUB)
        self.commands.connect(self._addresses['command_addresse'])
        self.commands.setsockopt(zmq.SUBSCRIBE, "all")
        self.commands.setsockopt(zmq.SUBSCRIBE, self.name)
        self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out = self.context.socket(zmq.PUSH)
        self.out.connect(self._addresses['frontend'])
        time.sleep(0.1)
        self.database_connection = self.context.socket(zmq.PUSH)
        self.database_connection.connect(self._addresses['database'])
        time.sleep(0.1)
        self.logger_connection = self.context.socket(zmq.PUSH)
        self.logger_connection.connect(self._addresses['logger'])

        self.messages_in = self.context.socket(zmq.DEALER)
        self.messages_in.setsockopt(zmq.IDENTITY, self.name)
        self.messages_in.connect(self._addresses['backend'])

        self.shout = self.context.socket(zmq.SUB)
        self.shout.connect(self._addresses['group_backend'])
        self.shout.setsockopt(zmq.SUBSCRIBE, "all")
        self.shout.setsockopt(zmq.SUBSCRIBE, self.name)
        self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out.send_multipart(['!', '!', 'register_agent', self.name])

        while True:
            try:
                self.commands.recv()  # catches the group adress.
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s,self.commands.recv() to catch own adress ~1888' % (self.name))
                break
            command = self.commands.recv()
            if command == "!":
                subcommand = self.commands.recv()
                if subcommand == 'die':
                    self.__signal_finished()
                    break
            try:
                self._methods[command]()
            except KeyError:
                if command not in self._methods:
                    raise SystemExit('The method - ' + command + ' - called in the agent_list is not declared (' + self.name)
                else:
                    raise
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s, Current command: %s ~1984' % (self.name, command))
                break

            if command[0] != '_':
                self.__reject_polled_but_not_accepted_offers()
                self.__signal_finished()
        #self.context.destroy()

the whole code is under http://www.github.com/DavoudTaghawiNejad/abce


Answer:

Odds are it's not too many contexts, it's too many sockets. Looking through your repo, I see you're (correctly) using IPC as your transport; IPC uses a file descriptor as the "address" to pass data back and forth between different processes. If I'm reading correctly, you're opening up to 7 sockets per process, so that'll add up quickly. I'm betting that if you do some debugging in the middle of your code, you'll see that it doesn't fail when the last context is created, but when the last socket pushes the open file limit over the edge.

My understanding is that the typical user limit for open FDs is around 1000, so at around 100 agents you're pushing 700 open FDs just for your sockets. The remainder is probably just typical. There should be no problem increasing your limit up to 10,000, higher depending on your situation. Otherwise you'll have to rewrite to use less sockets per process to get a higher process limit.

Question:

I'm trying to use ZeroMQ in Python (pyzmq) together with multiprocessing. As a minmal (not) working example I have a server- and a client-class which both inherit from multiprocessing.Process. The client as a child-process should send a message to the server-child-process which should print the message:

#mpzmq_class.py

from multiprocessing import Process
import zmq


class Server(Process):
    def __init__(self):
        super(Server, self).__init__()
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.PULL)
        self.socket.connect("tcp://localhost:6068")

    def run(self):
        msg = self.socket.recv_string()
        print(msg)


class Client(Process):
    def __init__(self):
        super(Client, self).__init__()
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.PUSH)
        self.socket.bind("tcp://*:6068")

    def run(self):
        msg = "Hello World!"
        self.socket.send_string(msg)

if __name__ == "__main__":
    s = Server()
    c = Client()
    s.start()
    c.start()
    s.join()
    c.join()

Now if I run this the server-process seems to hang at the receive-call msg = socket.receive_string(). In another (more complicated) case, it even hung at the socket.connect("...")-statement.

If I rewrite the script to use functions instead of classes/objects, it runs just fine:

# mpzmq_function.py

from multiprocessing import Process
import zmq


def server():
    ctx = zmq.Context()
    socket = ctx.socket(zmq.PULL)
    socket.connect("tcp://localhost:6068")
    msg = socket.recv_string()
    print(msg)


def client():
    ctx = zmq.Context()
    socket = ctx.socket(zmq.PUSH)
    socket.bind("tcp://*:6068")
    msg = "Hello World!"
    socket.send_string(msg)

if __name__ == "__main__":
    s = Process(target=server)
    c = Process(target=client)
    s.start()
    c.start()
    s.join()
    c.join()

Output:

paul@AP-X:~$ python3 mpzmq_function.py 
Hello World!

Can anybody help me with this? I guess it's something I didn't understand concerning the usage of multiprocessing.

Thank you!


Answer:

I run into the same issue. I guess the problem is, that the run method has no access to the context object. Maybe it has something to do with the C implementation and the fact, that processes do not have shared memory. If instantiate the context in the run method, it works.

Here a working example:

#mpzmq_class.py

from multiprocessing import Process
import zmq


class Base(Process):
    """
    Inherit from Process and
    holds the zmq address.
    """
    def __init__(self, address):
        super().__init__()
        self.address = address


class Server(Base):
    def run(self):
        ctx = zmq.Context()
        socket = ctx.socket(zmq.PULL)
        socket.connect(self.address)
        msg = socket.recv_string()
        print(msg)


class Client(Base):
    def run(self):
        ctx = zmq.Context()
        socket = ctx.socket(zmq.PUSH)
        socket.bind(self.address)
        msg = "Hello World!"
        socket.send_string(msg)


if __name__ == "__main__":
    server_addr = "tcp://127.0.1:6068"
    client_addr = "tcp://*:6068"
    s = Server(server_addr)
    c = Client(client_addr)
    s.start()
    c.start()
    s.join()
    c.join()

I added a base class to demonstrate that you can still access normal Python objects from the run method. If you put the context object into the init Method, it won't work.

Question:

I have a process A that publishes a message constantly and processes B and C subscribe to the topic and get the latest message published by the publisher in process A.

So, I set zmq.CONFLATE to both publisher and subscriber. However, I found that one subscriber was not able to receive messages.

def publisher(sleep_time=1.0, port="5556"):

    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.bind("tcp://*:%s" % port)
    print ("Running publisher on port: ", port)

    while True:
        localtime = time.asctime( time.localtime(time.time()))
        string = "Message published time: {}".format(localtime)
        socket.send_string("{}".format(string))
        time.sleep(sleep_time)

def subscriber(name="sub", sleep_time=1, ports="5556"):

    print ("Subscriber Name: {}, Sleep Time: {}, Port: {}".format(name, sleep_time, ports))

    context = zmq.Context()
    print ("Connecting to publisher with ports %s" % ports)
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.CONFLATE, 1)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")
    socket.connect ("tcp://localhost:%s" % ports)

    while True:

        message = socket.recv()
        localtime = time.asctime( time.localtime(time.time()))
        print ("\nSubscriber [{}]\n[RECV]: {} at [TIME]: {}".format(name, message, localtime))
        time.sleep(sleep_time)


if __name__ == "__main__":
    Process(target=publisher).start()
    Process(target=subscriber, args=("SUB1", 1.2, )).start()
    Process(target=subscriber, args=("SUB2", 1.1, )).start()

I tried to unset the socket.setsockopt(zmq.CONFLATE, 1) in the publisher, and that seemed to solve the problem. Both subscribers in processes B and C could receive messages and the messages seemed to be the latest ones.

I'm trying to find out why setting the publisher with CONFLATE caused the problem I had. I could not find information about it. Does anyone know what causes this behavior?

Also, I want to know, in the situation of one publisher to multiple subscribers, what is the correct code setup, so that subscriber can always get the latest messages?


Answer:

It's most likely a timing issue, the ZMQ_CONFLATE socket option limits the inbound and outbound queue to 1 message.

The way PUB/SUB works is the subscriber sends a subscription message to the publisher when you set the ZMQ_SUBSCRIBE option. If you start both subscribers at the same time then its possible that one of the subscription messages that arrived on the publisher queue will be discarded.

Try adding a sleep between the starting each subscriber.

From the zeromq docs

If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.

I am not saying this is the solution to you problem, but if that is the case we may need to post a change to libzmq to make the conflate options more granular so you can choose if conflate should be applied to inbound or outbound queues.

Question:

I have this code for my server:

import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5000")

while True:
    message = socket.recv()
    socket.send(b"World")
    print "sent"

while True:
    print "done."

I have a separate client script that sends a message through zmq to this one whenever i send a message. On the server (this code), if i only have the first while True:, it prints "sent" every time i send a message, and if i only have the second while True:, it prints "done." continuously. But if i put both, it never prints done (or if i switch their order and put both it never prints "sent" when i send a message").

As an output i want it to continuously print "done.", and also print "sent" when I get a message. So something like this:

done.
done.
done.
done.
done.
sent
done.
lots more done....

Basically i want both loops to run continuously and completely independently of each other.

N.B. I have tried using multiprocessing (such as in the 3rd answer here How do I run two python loops concurrently?), but couldn't get that to work either. I tried it as below:

import time
import zmq
from multiprocessing import Process

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5000")
i = time.time()

def zmq_loop():
    while True:
        message = socket.recv()
        socket.send(b"World")
        print "sent"

def done_loop():
    global i
    while True:
        i2 = time.time()-i
        if i2%2 == 0:
            print "done."

if __name__ == "__main__":
    Process(target=zmq_loop).start()
    Process(target=done_loop).start()

Answer:

As was explained yesterday in this, the [CONCURRENT]-processing is technically achievable in several different ways in python, each with a way different cost.

Today, let's have a look onto another approach - using a framework, that was developed with the very same motivation - having the natural [CONCURRENT]-scheduling-already-in-DNA - originally intended for easy composing and smooth operating complex GUI Man-Machine-Interactions ( MMI ).

This framework may and will help you achieve a lot, right due to the fact, it has evolved with a lot of care for exactly the same scenarios, where more than one thing has to be monitored at once:

Welcome to Tkinter GUI framework, which we will use just for its smart concurrently operated event handlers.

I was many times positively surprised, how easy it comes to build a quite complex composition of Finite-State-Automata ( FSA ), that smoothly cooperate together ( a coalition of FSA-s ), using tools for both independent, isolated operations ( the inner-logic of each FSA ), yet being easily able to propagate signals / messages from one FSA towards another(s). Yes, they can actually operate in 1-event-source-FSA : N-consumer(s)-FSA(s)

There you can create ( with ZeroMQ always in a non-blocking manner ) handlers -- one "sniffer" for a regular checking ( best by a timeout-controlled .poll() method to { NACK | POSACK } anything to read ) -- another "reader" for actual reading from the ZeroMQ Socket() instance ( triggered by the POSACK-signal from the "sniffer", as was mentioned previously -- another "do-a-work-er" for any other task one may wish to operate

Tkinter .mainloop() method is the global controller, which orchestrates the dirty job for you.

The hich level concept of Tkinter-mediated agent's co-processing starts in main() as simple as:

def main():
    root = Tk()                      # INIT a Tk() instance
    root.lift()                      #      + make it visible
    app = myApplication( root )      # SET ( [HERE] are all your app gems )
    root.mainloop()                  # START the core event-handling orchestrator

Tkinter might look as a garrage full of various GUI-gadgets, that have nothing to do with your problem, but don't panic.

Tkinter has incredibly well created tools right for your needs.

  • using control variables that will be used as a means for storing, signalling and propagating changes of values among otherwise independent and explicitly un-coordinated actors ( ref. "sniffer", "reader", "worker" and any others ... )

  • tools for handling events - real, abstract and even virtual

  • tools for handling timed-operations - in a form of an almost a lightweight real-time system, using setups with preferred timing of what shall happen next, the .mainloop()-yet tasked to bear in mind an explicitly specified timing .after( thisAmountOfMILLISECONDS, callThisFUNCTION ) or a liberal .after_idle( callAlwaysThatFUNCTION ).

One does not need indeed anything more to go and solve your task, using these already perfect tools.

So all the rest is just in principle under your creativity how to re-use these smart tools.


A small demo,how to make two ( 3 ! ) things happen "at the same time independently"

Let's setup the case, when one wants to process ( here demonstrated by a printing ) several independent processes, all at the same time.

    >>> #-----------------------------------------------FAST MOCK-UP EXAMPLE
    >>> import Tkinter as tk                          # python27
    >>> root = tk.Tk()
    >>> root.protocol( "WM_DELETE_WINDOW", root.quit() )
    '3071841620Ldestroy'
    >>> #------VAR-------------------------------------IMPORTANT TOOL:
    >>> aStringVAR = tk.StringVar()
    >>> aStringVAR.set( "_init_" )

    >>> def aKeyPressEventHANDLER( anEvent ): # SIMPLE EventHANDLER,
            #                                 #        also ignites remote responsive processes
    ...     aTemplate = "[KEY]::{3: >10s}\n<s/n>::{0: >10d}\n(=@=)::{1: > 10d}\n^from::({5:})"
    ...     sString   = aTemplate.format( anEvent.serial,
    ...                                   anEvent.time,
    ...                                   anEvent.char,
    ...                                   anEvent.keysym,
    ...                                   anEvent.keysym_num,
    ...                               str(anEvent.widget )
    ...                               )
    ...     aStringVAR.set( sString )
    ...     print sString
    ... 
    >>> #----VAR_TRACER----------------------------------------[#1]
    >>> def aVAR_TRACER_A( p1_quasiNAME, p2_indexOrEmptyString, p3_accessMODE ):
    ...     print "aVAR_TRACER_A()-called::(on){0:} traced_event({1:})".format( str( p1_quasiNAME ), str( p3_accessMODE ) )
    ...     # ###############=[A]#######
    ...     # < do some task =[A] here >
    ...     # ###############=[A]#######
    ...     print "aVAR_TRACER_A()         [{0:}]".format(   str( root.globalgetvar( p1_quasiNAME ) ).replace( " ", "" ) )
    ...

    >>> #----VAR_TRACER----------------------------------------[#2]
    >>> def aVAR_TRACER_B( p1_quasiNAME, p2_indexOrEmptyString, p3_accessMODE ):
    ...     print "aVAR_TRACER_B()-called::(on){0:} traced_event({1:})".format( str( p1_quasiNAME ), str( p3_accessMODE ) )
    ...     # ###############=[B]#######
    ...     # < do some task =[B] here >
    ...     # ###############=[B]######
    ...     print "aVAR_TRACER_B()         [{0:}]".format(   str( root.globalgetvar( p1_quasiNAME ) ).replace( " ", "" ) )
    ... 

    >>> #-----VAR_A_tracer_ID------------------------------"w" EVENT SNIFFER
    >>> aTraceVAR_A_tracer_ID = aStringVAR.trace_variable( "w", aVAR_TRACER_A )

    >>> #-----VAR_B_tracer_ID------------------------------"w" EVENT SNIFFER
    >>> aTraceVAR_B_tracer_ID = aStringVAR.trace_variable( "w", aVAR_TRACER_B )

    >>> #-----------tracer_ID values for ev. theirs resp. de-activation:
    >>> aTraceVAR_A_tracer_ID
    '3071960124LaVAR_TRACER_A'
    >>> aTraceVAR_B_tracer_ID
    '3071961284LaVAR_TRACER_B'

    >>> #---.bind()-----------------------EventHANDLER with a system event <KeyPress>
    >>> root.bind( "<KeyPress>", aKeyPressEventHANDLER ) # <-since here LIVE (!)
    '3071841740LaKeyPressEventHANDLER'
    >>> #------------------------------------------------^^^ since here, it went live
    >>> #                                                 1: having put a mouse on tk-window,
    >>> #                                                 2: set-focus by click
    >>> #                                                 3: started keys:
    >>> #                                                    ( "a",
    >>> #                                                       6-on-<NumKeyPad>,
    >>> #                                                       *-on-<NumKeyPad>

    >>> # this happened "independently, at the same time" ( see time (=@=):: values )
    >>> 

    aVAR_TRACER_B()-called::(on)PY_VAR0 traced_event(w)
    aVAR_TRACER_B()         [[KEY]::a<s/n>::832(=@=)::88486992^from::(.)]
    aVAR_TRACER_A()-called::(on)PY_VAR0 traced_event(w)
    aVAR_TRACER_A()         [[KEY]::a<s/n>::832(=@=)::88486992^from::(.)]
    [KEY]::         a
    <s/n>::       832
    (=@=)::  88486992
    ^from::(.)
    aVAR_TRACER_B()-called::(on)PY_VAR0 traced_event(w)
    aVAR_TRACER_B()         [[KEY]::KP_6<s/n>::832(=@=)::88509107^from::(.)]
    aVAR_TRACER_A()-called::(on)PY_VAR0 traced_event(w)
    aVAR_TRACER_A()         [[KEY]::KP_6<s/n>::832(=@=)::88509107^from::(.)]
    [KEY]::      KP_6
    <s/n>::       832
    (=@=)::  88509107
    ^from::(.)
    aVAR_TRACER_B()-called::(on)PY_VAR0 traced_event(w)
    aVAR_TRACER_B()         [[KEY]::KP_Multiply<s/n>::832(=@=)::88541180^from::(.)]
    aVAR_TRACER_A()-called::(on)PY_VAR0 traced_event(w)
    aVAR_TRACER_A()         [[KEY]::KP_Multiply<s/n>::832(=@=)::88541180^from::(.)]
    [KEY]::KP_Multiply
    <s/n>::       832
    (=@=)::  88541180
    ^from::(.)

Question:

I have a class like following.

class R(object):
    def __init__(self, initial=0, incr=2):
        self.initial = initial
        self.incr = incr
        self.value = initial

    def add(self):
        time.sleep(1)
        self.value += self.incr

    def mul(self):
        time.sleep(3)
        self.value *= self.incr

    def get(self):
        return self.value

I want to create a server that will do the following.

  • for each init request it creates an object R in a different process
  • for each call to .add(), .mul() methods, the server calls that process and applies corresponding calls. But it is time consuming, so it returns immediately. On the other hand the remote R process is still executing the .add() or .mul() methods' bodies.
  • If the server receives a call to the .get() method, it calls the corresponding process and applies .get() to it. But this time it waits for the result and when the result is there, it terminates the remote R object.

How this can be achieved?

What I have tried so far? I created R object using multiprocessing.Process. Then send commands to it using multiprocessing.Queue with put_nowait.

But it seems I have to do control the flow myself. Is there any other framework, tool that does the same thing?

The server is written in zmq, But I am open to change it to a different solution.


Answer:

Perfectly doable in ZeroMQ.


Why to waste CPU / overheads by using inappropriate tools?

ZeroMQ allows you to adaptively spin-up remote worker-instances and load-balance / fail-safe-heal their actual performance and also their (un-)-avoidable silent exits.

The logic is in your hands, the performance need not be cannibalised by any language- or environment-driven priors.

Yes, you have to do the flow control, but this is your strength to do so, not a weakness - isn't it?

If your needs are to gain more performance, may go this way, using ZeroMQ / pyzmq or even a more lightweight & fat-free nanomsg tools. – user3666197 Nov 2 '17 at 17:08