Hot questions for Using ZeroMQ in multiprocessing

Top 10 C/C++ Open Source / ZeroMQ / multiprocessing

Question:

ZeroMQ is used for receiving input parameters..

def server():
    rep = context.socket(zmq.REP)
    rep.bind('tcp://*:{}'.format(PORT))

    while True:
        data = rep.recv_json()
        result = calculate(data)
        rep.send_json(result)

The calculation method is called calculate, after finished, result would be sent to client through ZMQ.

Base on my test, it currently uses only 1 core of the machine, now I wanted to use other cores. I've read some docs about multiprocessing and multithreading, but they mainly focus on fixed inputs, which is not my case.

So I need some kindly help now..


Answer:

Here's how you can use multiprocessing to have multiple worker processes that can deal with concurrent clients connecting:

import zmq
from multiprocessing import Process

def calculate(data):
    return {"output": data['ok']}

def handle_request(url):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect(url)
    while True:
        data = socket.recv_json()
        print("received {}".format(data))
        out = calculate(data)
        socket.send_json(out)

def server():
    context = zmq.Context()
    # Set up socket for clients to connect to.
    clients = context.socket(zmq.ROUTER)
    clients.bind('tcp://*:{}'.format(5556))

    # Set up ipc socket for workers to connect to
    url_worker = 'ipc:///tmp/workers'
    workers = context.socket(zmq.DEALER)
    workers.bind(url_worker)

    # Start 4 worker processes    
    for _ in range(4):
        p = Process(target=handle_request, args=(url_worker,))
        p.start()

    # Forward requests from clients to the workers via a Queue
    zmq.device(zmq.QUEUE, clients, workers)


if __name__ == "__main__":
    server()

Now, if you point a sample client at it:

import zmq
from threading import Thread

def send_req(request):
    context = zmq.Context()

    print("Connecting to hello world server...")
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5556")
    print("Sending request %s ..." % request)

    socket.send_json({"ok" : "Hello"})

    message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))

#  Do 10 requests in parallel
for request in range(10):
    Thread(target=send_req, args=(request,)).start()

You get the following output:

Connecting to hello world server...
Sending request 0 ...
Connecting to hello world server...
Sending request 1 ...
Connecting to hello world server...
Sending request 2 ...
Connecting to hello world server...
Sending request 3 ...
Connecting to hello world server...
Sending request 4 ...
Connecting to hello world server...
Sending request 5 ...
Connecting to hello world server...
Sending request 6 ...
Connecting to hello world server...
Sending request 7 ...
Connecting to hello world server...
Sending request 8 ...
Connecting to hello world server...
Sending request 9 ...
<5 second delay>
Received reply 0 [ {"output":"Hello"} ]
Received reply 1 [ {"output":"Hello"} ]
 Received reply 3 [ {"output":"Hello"} ]
Received reply 2 [ {"output":"Hello"} ]
<5 second delay>
Received reply 4 [ {"output":"Hello"} ]
Received reply 5 [ {"output":"Hello"} ]
 Received reply 6 [ {"output":"Hello"} ]
Received reply 7 [ {"output":"Hello"} ]
< 5 second delay>
Received reply 8 [ {"output":"Hello"} ]
Received reply 9 [ {"output":"Hello"} ]

Question:

It was a task from a book called "Node.js 8 the Right Way". You can see it below:

That's my solution:

'use strict';
const zmq = require('zeromq');
const cluster = require('cluster');

const push = zmq.socket('push');
const pull = zmq.socket('pull');

const cores_num = require('os').cpus().length;
let workers_num = 0;

push.bind('tcp://127.0.0.1:9998');
pull.bind('tcp://127.0.0.1:9999');
// push.bind('ipc://push.ipc');
// pull.bind('ipc://pull.ipc');

if (cluster.isMaster) {
  for (let j = 0; j < cores_num; j++) {
    cluster.fork();
  }

  cluster.on('online', (worker) => {
    console.log(`Worker with pid ${worker.process.pid} is created!`);
  });

  pull.on('message', (data) => {
    const response = JSON.parse(data.toString());

    if (response.type === 'ready') {
      if (workers_num >= 0 && workers_num < 3) {
        workers_num++;

        if (workers_num == 3) {
          console.log('Ready!');

          for (let i = 0; i < 10; i++) {
            push.send(JSON.stringify({
              type: 'job',
              data: `This message has id ${i}`
            }));
          }
        }
      }
    } else if (response.type === 'result') {
      console.log(data.toString());
    }
  });
} else {
  const worker_push = zmq.socket('push');
  const worker_pull = zmq.socket('pull');

  worker_pull.connect('tcp://127.0.0.1:9998');
  worker_push.connect('tcp://127.0.0.1:9999');
  // worker_pull.connect('ipc://push.ipc');
  // worker_push.connect('ipc://pull.ipc');

  worker_push.send(JSON.stringify({
    type: 'ready'
  }));

  worker_pull.on('message', data => {
    const request = JSON.parse(data);

    if (request.type === 'job') {
      console.log(`Process ${process.pid} got message ${request.data}`);
      worker_push.send(JSON.stringify({
        type: 'result',
        data: `This message is a response from process ${process.pid}`,
        time: Date.now()
      }));
    }
  });
}

As you can see, it works only when PUSH/PULL sockets and workers communicate via TCP, but I want to know the reason why it doesn't work via IPC.

Update ( ref: Condition 4 below "pathname must be writeable" ): I hope that you will help me with finding a problem.


Answer:

Few things:
Your IPC path is incorrect:

You have ipc://push.ipc (2 slashes) you really need ipc:///push.ipc The protocol is ipc:// then you need the file path /push.ipc

File permissions:

Does your process have permission to write to the root directory? Unless you are running as root I would think not.

I would change the path to something like /tmp/push.ipc which in most systems is writable by all users.

In which case your url should be:

ipc:///tmp/push.ipc

Forking

I do not use node at all but based on my knowledge of other languages forking I think the whole program is run again in the different process/thread.

In this case isn't each worker trying to bind() again as the socket creation/bind code is outside of the if (cluster.isMaster) {

It should look like this I think

if (cluster.isMaster) {

  const push = zmq.socket('push');
  const pull = zmq.socket('pull');
  push.bind('ipc://push.ipc');
  pull.bind('ipc://pull.ipc');
  ....
}

Question:

I have created two unrelated daemon processes in C in Linux Ubuntu. These processes are in the sleeping mode, they only wake up when the data is received, and perform the action implemented in the signal handler and again sleep.

I have implemented this communication using SIGNAL IPC and message queue. Before sending the message, I send the signal SIGUSR1 and then send the data, and I wrote the signal handler for SIGUSR1 to perform required action.

I would like to implement the same way of communication using ZeroMQ and I have been reading their guide to find out, whether ZeroMQ has any kind of event handling or notification method for other process, when the data has arrived ( for C language ).

I have referred the following link too:

Does ZeroMQ have a notification/callback event/message for when data arrives?

But still I am doubtful.

Does ZeroMQ notify or trigger other process some event, when a new data has been sent ( I do not want to wait or poll() until the data arrives, instead my daemon process would be sleeping and when the data arrives, it would execute its handler and sleep again )?

It would be great if someone can help / suggest on this.


Answer:

No.

In as-is state, ZeroMQ does not implement either a trigger nor a callback.

Why?

Because the core Messaging mindset is to use Queues, not to disturb the flow of the processes, unless the processes themselves find it feasible to ask the Queue ( be it using a smart way, testing via .poll(), or in a dumb way, by a straight call to .recv( ZMQ_NOBLOCK ) ), if there are any messages ready and waiting for their post-processing and will then start to .recv() them from the ZeroMQ-side thread(s) and its resources & process them on their own process' footprint and work-flow.


But POSIX SIGNAL-s mechanics change The Rules of the Game:

Given the facts about the ZeroMQ per-se, the SIGUSR1 can deliver an independent ( just context-of-use aligned ) "Out-of-Band" signal to the receiving-process, that the sender-process has just marshaled some communication to be processed, sent and delivered down the road, so the receiving process may activate and follow the tools & methods for .recv()-ing data from the ZeroMQ delivery-path, in spite of the fact, that ZeroMQ mechanics does not provide call-back methods on its own.

So in this sense, that even the imperative language implementation of the algorithm becomes context-of-use aware about the success / failure of the last ZeroMQ call, the O/S POSIX signalling layer ( fully independent of the distributed ZeroMQ messaging / signalling delivery infrastructure ) can provide such sought means of inter-process coordination aimed at working among even sleeping / daemon instances of process-networks under test.

Question:

When designing a distributed system with 30 plus applications that each have 4 threads a piece for communication, so 120 threads total of 30 applications. We use ZeroMQ which uses Linux Sockets.

Is this number of threads going to be serious strain on the network?

Does multiprocessing/multiplexing have any benefit in taking strain off the network?

Link to resources would be great.

Thanks for your help,

Thank


Answer:

I can't say for ZeroMQ specifics, but generally network congestion have nothing to do with numbers of threads using the network. Networks are way slower than code executed in CPU, so if you want to saturate outbound bandwidth of a computer, you will usually need only one thread. I saw this for 100 Mbps Ethernet, this is probably true for 1 Gbps. Can't say for 10 Gbps+ solutions as I never worked with them.

Multithreading vs. multiplexing usually solves other issue. You should prefer in-thread multiplexing when the amount of threads you have depends on the amount of clients you serve. You say you already have a fixed number of threads per machine (4), so it's already pretty much perfect.

So, go with whatever fixed amount of threads you like. If it were to bring the network down, making less threads wouldn't help. If TCP isn't able to adapt, and your network has unacceptable quality of service, you'll need to limit your apps in some other way.

Question:

I'm reading this code http://zguide.zeromq.org/py:mtserver But when I've tried to replace threading.Thread by multiprocessing.Process I got the error

Assertion failed: ok (mailbox.cpp:84)

Code is

import time
import threading
import zmq

def worker_routine(worker_url, context=None):
    """Worker routine"""
    context = context or zmq.Context.instance()
    # Socket to talk to dispatcher
    socket = context.socket(zmq.REP)

    socket.connect(worker_url)

    while True:

        string  = socket.recv()

        print("Received request: [ %s ]" % (string))

        # do some 'work'
        time.sleep(1)

        #send reply back to client
        socket.send(b"World")

def main():
    """Server routine"""

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets
    context = zmq.Context.instance()

    # Socket to talk to clients
    clients = context.socket(zmq.ROUTER)
    clients.bind(url_client)

    # Socket to talk to workers
    workers = context.socket(zmq.DEALER)
    workers.bind(url_worker)

    # Launch pool of worker threads
    for i in range(5):
        process = multiprocessing.Process(target=worker_routine, args=(url_worker,))
        process.start()

    zmq.device(zmq.QUEUE, clients, workers)

    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    context.term()

if __name__ == "__main__":
    main()

Answer:

The limitations of each transport is detailed in the API.

inproc is for intra-process communication (i.e. threads). You should try ipc which support inter-process communication or even just tcp.

Question:

I'm trying to communicate with a subprocess which I start using multiprocessing.Process via a ZeroMQ socket. I know that there exist solutions to communicate with subprocesses within the multiprocessing module but I want to ultimately communicate with a function from a shared library written in C++. Without further ado, here is the self-contained code:

import time
import zmq
import multiprocessing

def perform(nseconds, endpoint):
    context = zmq.Context()
    publisher = context.socket(zmq.PUB)
    publisher.connect(endpoint)

    for i in range(5):
        time.sleep(nseconds)
        publisher.send_string("{}".format(i))
    publisher.send_string(">>END")

if __name__ == "__main__":
    multiprocessing.freeze_support()
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.bind("tcp://*:*")
    socket.setsockopt_string(zmq.SUBSCRIBE, u"")
    endpoint = socket.getsockopt_string(zmq.LAST_ENDPOINT)

    print("Binding via {}".format(endpoint))

    t = multiprocessing.Process(target=perform, args=(1,endpoint))
    t.start()

    string = ""
    while not ">>END" in string:
        string = socket.recv_string()
        print(string)

    t.join()

This code runs perfectly fine on GNU/Linux with the expected output:

Binding via tcp://0.0.0.0:34149
0
1
2
3
4
>>END

But running this on Windows with Python from Anaconda and pyzmq version 16.0.2 installed with conda install pyzmq it crashes with the following error:

Binding via tcp://0.0.0.0:52019
Assertion failed: Can't assign requested address (bundled\zeromq\src\tcp_connect
er.cpp:341)

How do I fix this? Or am I doing it wrong? And if I am doing it wrong, why is it platform dependent?


Answer:

Remove the wildcard in socket.bind( "tcp://*:*" )

First, this is very platform specific, one ought never rely on how the wildcard expansion will get handled on unknown ecosystem details in production. Be explicit.

Next, this is on the very contrary to the concise, state-of-art resources management practice in Distributed-systems design and there could be hardly a worse idea from the architecture point of view, than to .bind() straight on all ports on all localhost available addresses. Just imagine what this causes inside the Context()-instance, to manage all that herd of endpoints and their associated resources' pools ready and sniffing for a potentially incoming connection request ( just in case one such may appear ). No.

Never do this.


Performance sins and other aspects - ref. my criticism on Amdahl's Law v/s costs

Well, having seen your publications, I will not expand this a lot, but yet have to add, the Distributed-computing realm requires a lot of care spent on details -- never expect a few SLOCs to perform any good for HPC computations.

The worst sins come from a good will to improve some process. If a code spins-off a process ( what the multiprocessing is exactly designed to do ), not many people also associate with such a SLOC the actual costs, that have to be paid, before the departed code gets it's first chance to start processing ( replica of the complete python execution environment ( so as to escape from the original local-GIL etc ) -- so your code has to pay in both [TIME] and [SPACE] for a huge memory-to-memory transfer, next your proposed code instantiates another "remote"-process Context()-engine. While this may look smart, your code has to pay for it again all costs - and spend just a few SLOCs to process. Next, always use rather a .setsockopt( zmq.LINGER, 0 ), so as not to leave your resources infinitely blocking a graceful termination. While this seems as a "nice-to-have" preventive step, it is rather a "must-do" lifesaver, before hunting "what went wrong this time?" on a big and expensive computing infrastructure...

Many other performance tweaking deserves to be done before shelling out multiprocessing.Process( ... ), which goes way beyond the scope of this post. But worth mastering, definitely before running many-times inefficient code.

Question:

I have a ROUTER whose purpose is to accumulate image data from multiple DEALER clients and perform OCR on the complete image. I found that the most efficient way of handling the OCR is through the utilization of Python's multiprocessing library; the accumulated image bytes are put into a Queue for due procession in a separate Process. However, I need to ensure that when a client experiences a timeout that the Process is properly terminated and doesn't meaninglessly linger and hog resources.

In my current solution I insert each newly-connected client into a dict where the value is my ClientHandler class that possesses all image data and spawns a Thread that sets a boolean variable named "timeout" to True when 5 seconds have elapsed. Should a new message be received within that 5 second frame, bump is called & the timer is reset back to 0, otherwise I cleanup prior to thread termination and the reference is deleted from the dict in the main loop:

import threading
import time
import zmq

class ClientHandler(threading.Thread):
    def __init__(self, socket):
        self.elapsed = time.time()
        self.timeout = False

        self.socket = socket

        super(ClientHandler, self).__init__()

    def run(self):
        while time.time() - self.elapsed < 5.0:
            pass

        self.timeout = True

        # CLIENT TIMED OUT
        # HANDLE TERMINATION AND CLEAN UP HERE

    def bump(self):
        self.elapsed = time.time()

    def handle(self, id, header, data):
        # HANDLE CLIENT DATA HERE
        # ACCUMULATE IMAGE BYTES, ETC

        self.socket.send_multipart([id, str(0)])

def server_task():
    clients = dict()

    context = zmq.Context.instance()
    server = context.socket(zmq.ROUTER)

    server.setsockopt(zmq.RCVTIMEO, 0)

    server.bind("tcp://127.0.0.1:7777")

    while True:
        try:
            id, header, data = server.recv_multipart()

            client = clients.get(id)

            if client == None:
                client = clients[id] = ClientHandler(server)

                client.start()

            client.bump()
            client.handle(id, header, data)
        except zmq.Again:
            for id in clients.keys():
                if clients[id].timeout:
                    del clients[id]

    context.term()

if __name__ == "__main__":
    server_task()

But this entire method just doesn't feel right. Am I going about this improperly? If so, I would greatly appreciate if someone could point me in the right direction.


Answer:

Figured it out myself, hoping it may be of assistance to others.

I instead have a ROUTER on an assigned port that distributes unique ports to each client, which thereafter connects to the newly-bound socket on said unique port. When a client disconnects, the port is recycled for reassignment.

import sys
import zmq
from multiprocessing import Process, Queue, Value

def server_task():
    context = zmq.Context.instance()

    server = context.socket(zmq.ROUTER)

    server.bind("tcp://127.0.0.1:7777")

    timeout_queue = Queue()
    port_list = [ 1 ]

    proc_list = [ ]

    while True:
        try:
            id = server.recv_multipart()[0]

            # Get an unused port from the list
            # Ports from clients that have timed out are recycled here

            while not timeout_queue.empty():
                port_list.append(timeout_queue.get())

            port = port_list.pop()

            if len(port_list) == 0:
                port_list.append(port + 1)

            # Spawn a new worker task, binding the port to a socket

            proc_running = Value("b", True)

            proc_list.append(proc_running)

            Process(target=worker_task, args=(proc_running, port, timeout_queue)).start()

            # Send the new port to the client

            server.send_multipart([id, str(7777 + port)])

        except KeyboardInterrupt:
            break

    # Safely allow our worker processes to terminate
    for proc_running in proc_list:
        proc_running.value = False

    context.term()

def worker_task(proc_running, port, timeout_queue):
    context = zmq.Context.instance()

    worker = context.socket(zmq.ROUTER)

    worker.setsockopt(zmq.RCVTIMEO, 5000)
    worker.bind("tcp://127.0.0.1:%d" % (7777 + port, ))

    while proc_running.value:
        try:
            id, data = worker.recv_multipart()

            worker.send_multipart([id, data])
        except zmq.Again:
            timeout_queue.put(port)

            context.term()

            break

    print("Client on port %d disconnected" % (7777 + port, ))