Hot questions for Using ZeroMQ in server

Question:

I've got a site that displays data from a game server. The game has different "domains" (which are actually just separate servers) that the users play on.

Right now, I've got 14 cron jobs running at different intervals of the hour every 6 hours. All 14 files that are run are pretty much the same, and each takes around 75 minutes ( an hour and 15 minutes ) to complete it's run.

I had thought about just using 1 file run from cron and looping through each server, but this would just cause that one file run for 18 hours or so. My current VPS is set to only allow 1 vCPU, so I'm trying to accomplish things and stay within my allotted server load.

Seeing that the site needs to have updated data available every 6 hours, this isn't doable.

I started looking into message queues and passing some information to a background process that will perform the work in question. I started off trying to use resque and php-resque, but my background worker died as soon as it was started. So, I moved on to ZeroMQ, which seems to be more what I need, anyway.

I've set up ZMQ via Composer, and everything during the installation went fine. In my worker script (which will be called by cron every 6 hours), I've got:

$dataContext = new ZMQContext();
$dataDispatch = new ZMQSocket($dataContext, ZMQ::SOCKET_PUSH);
$dataDispatch->bind("tcp://*:50557");

$dataDispatch->send(0);

foreach($filesToUse as $filePath){
    $dataDispatch->send($filePath);
    sleep(1);
}

$filesToUse = array();
$blockDirs = array_filter(glob('mapBlocks/*'), 'is_dir');
foreach($blockDirs as $k => $blockDir){
    $files = glob($rootPath.$blockDir.'/*.json');
    $key = array_rand($files);
    $filesToUse[] = $files[$key];
}

$mapContext = new ZMQContext();
$mapDispatch = new ZMQSocket($mapContext, ZMQ::SOCKET_PUSH);
$mapDispatch->bind("tcp://*:50558");

$mapDispatch->send(0);

foreach($filesToUse as $blockPath){
    $mapDispatch->send($blockPath);
    sleep(1);
}

$filesToUse is an array of files submitted by users that contain information to be used in querying the game server. As you can see, I'm looping through the array and sending each file to the ZeroMQ listener file, which contains:

$startTime = time();

$context = new ZMQContext();

$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->connect("tcp://*:50557");

$sender = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
$sender->connect("tcp://*:50559");

while(true){
    $file = $receiver->recv();

    // -------------------------------------------------- do all work here
    // ... ~ 75:00 [min] DATA PROCESSING SECTION foreach .recv()-ed WORK-UNIT
    // ----------------------------------------------------------------------

    $endTime = time();
    $totalTime = $endTime - $startTime;
    $sender->send('Processing of domain '.listener::$domain.' competed on '.date('M-j-y', $endTime).' in '.$totalTime.' seconds.');
}

Then, in the final listener file:

$context = new ZMQContext();
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->bind("tcp://*:50559");

while(true){
    $log = fopen($rootPath.'logs/sink_'.date('F-jS-Y_h-i-A').'.txt', 'a');
    fwrite($log, $receiver->recv());
    fclose($log);
}

When the worker script is run from cron, I get no confirmation text in my log.

Q1) is this the most efficient way to do what I'm trying to? Q2) am I trying to use or implement ZeroMQ incorrectly, here?

And, as it would seem, using cron to call 14 files simultaneously is causing the load to far exceed the allotment. I know I could probably just set the jobs to run at different times throughout the day, but if at all possible, I would like to keep all updates on the same schedule.


UPDATE:

I have since gone ahead and upgraded my VPS to 2 CPU cores, so the load aspect of the question isn't really all that relevant anymore.

The code above has also been changed to the current setup.

I am, after the code-update, getting an email from cron now with the error:

Fatal error: Uncaught exception 'ZMQSocketException' with message 'Failed to bind the ZMQ: Address already in use'


Answer:

Running your scripts through cron or through ZeroMQ will make absolutely no difference in how much CPU you will need. The only difference between the two is that the cron job starts your script at intervals and the messaging queue will start your script based on some user action.

At the end of the day, you need more available threads to run your scripts. But before you go down that path, you may want to take a look at your scripts. Maybe there's a more efficient way of writing them so that they don't take as much resources? And have you looked at your CPU utilization rate? Most web hosting services have built-in metrics that you can pull up through their console. You might not be using as much resources as you think.

The fact that it will take you that much longer to run a file that loops through all the servers than the cumulative time of running the files separately suggest that your scripts aren't being multi-threaded properly. A single instance of your script is not using up all available resources and thus you are only seeing speed gains when you run multiple instances of your scripts.

Question:

I have the following strange situation.

We have a process, call it Distributor, that receives tasks over ZeroMQ/TCP from Client, and accumulates them in a queue. There is a Worker process, which talks with the Distributor over ZeroMQ/IPC. The Distributor forwards each incoming task to Worker, and waits for an answer. As soon as the Worker answers, it sends it another task (if there was one received in the mean time), and returns the answer to the Client (over a separate ZeroMQ/TCP connection). If a task was not processed within 10ms, it is dropped from the queue.

With 1 Worker, the system is capable to process ~3,500 requests/sec. The client sends 10,000 requests/sec, so 6,500 requests are dropped.

But - when I'm running some unrelated process on the server, which takes 100% CPU (a busy wait loop, or whatever) - then, strangely, the system can suddenly process ~7,000 requests/sec. When the process is stopped, it returns back to 3,500. The server has 4 cores.

The same happens when running 2, 3 or 4 Workers (connected to the same Distributor), with slightly different numbers.

The Distributor is written in C++. The Worker is written in Python, and uses pyzmq binding. The worker process is a simple arithmetic process, and does not depend on any external I/O other than Distributor.

There is a theory that this has to do with ZeroMQ using threads on separate CPUs when the server is free, and the same CPU when it's busy. If this is the case, I would appreciate an idea how to configure thread/CPU affinity of ZeroMQ so that it works correctly (without running a busy loop in background).

Is there any ZeroMQ setting that might explain / fix this?

EDIT:

This doesn't happen with a Worker written in C++.


Answer:

This was indeed a CPU affinity problem. Turns out that using ZeroMQ in a setting where a worker processes an input and waits for the next one, if the context switch causes it to switch to another process, a lot of time is wasted on copying the ZeroMQ data.

Running the worker with

taskset -c 1 python worker.py

solves the problem.

Question:

I'm trying to create an HTTP server with ZMQ_STREAM socket.

When I do a simple POST request:

POST  HTTP/1.1
Host: localhost:5555
Cache-Control: no-cache
Postman-Token: 67004be5-56bc-c1a9-847a-7db3195c301d

Apples to Oranges!

Here is how I handle this with pyzmq:

context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")

while True:
    # Get HTTP request
    parts = []
    id_, msg = socket.recv_multipart()  # [id, ''] or [id, http request]
    parts.append(id_)
    parts.append(msg)
    if not msg:
        # This is a new connection - this is just the identify frame (throw away id_)
        # The body will come next
        id_, msg = socket.recv_multipart() # [id, http request]
        parts.append(id_)
        parts.append(msg)

        end = socket.recv_multipart() # [id*, ''] <- some kind of junk? 
        parts.append(end)

    print("%s" % repr(parts))

So that parts list comes out to be:

['\x00\x80\x00\x00)', '', '\x00\x80\x00\x00)', 'POST / HTTP/1.1\r\nHost: localhost:5555\r\nConnection: keep-alive\r\nContent-Length: 18\r\nCache-Control: no-cache\r\nOrigin: chrome-extension://fhbjgbiflinjbdggehcddcbncdddomop\r\nContent-Type: text/plain;charset=UTF-8\r\nUser-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36\r\nPostman-Token: 9503fce9-8b1c-b39c-fb4d-3a7f21b509de\r\nAccept: */*\r\nAccept-Encoding: gzip, deflate\r\nAccept-Language: en-US,en;q=0.8,ru;q=0.6,uk;q=0.4\r\n\r\nApples to Oranges!', ['\x00\x80\x00\x00*', '']]

So I understand that:

  1. '\x00\x80\x00\x00)', '' is the identity of the connection. This is set initially by ZMQ_STREAM socket. On subsequent requests it seems to be absent.
  2. \x00\x80\x00\x00) is the identity again, this is what we see on subsequent requests from the client from ZMQ_STREAM socket.
  3. Then the actual HTTP request

But the last pair of magic numbers: ['\x00\x80\x00\x00*', '']

What the heck does that stand for?

References:

  1. http://api.zeromq.org/4-0:zmq-socket
  2. HTTP 1.1 Spec: http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html

Answer:

But the last pair of magic numbers: ['\x00\x80\x00\x00*', ''] What the heck does that stand for?

That's a new connection, with a new connection ID. The connection id is an integer counter, and you can see using the Python builtin ord to see that ord(')') = 41 and ord('*') = 42, which is the next number in sequence.

Writing an HTTP server with ZMQ_STREAM, you have to be careful because it's more complicated than just receiving one message after the connection is established. The issue is primarily that you aren't guaranteed that a request will be complete; the body could arrive in chunks in potentially several messages. You are going to have to read HTTP headers and handle receiving the body in pieces.

Here is an example that handles POST requests coming from curl:

from traceback import print_exc
import zmq
from tornado.httputil import HTTPHeaders

class BadRequest(Exception):
    pass

class ConnectionLost(Exception):
    pass

def parse_request(request):
    """Parse a request verp, path, and headers"""
    first_line, header_lines = request.split(b'\r\n', 1)
    verb, path, proto = first_line.decode('utf8').split()
    headers = HTTPHeaders.parse(header_lines.decode('utf8', 'replace'))
    return verb, path, headers


def recv_body(socket, headers, chunks, request_id):
    """Receive the body of a request"""
    if headers.get('expect', '').lower() == '100-continue':
        if 'Content-Length' not in headers:
            # Don't support chunked transfer: http://tools.ietf.org/html/rfc2616#section-3.6.1
            print("Only support specified-length requests")
            socket.send_multipart([
                request_id, b'HTTP/1.1 400 (Bad Request)\r\n\r\n',
                request_id, b'',
            ])
            msg = 1
            while msg != b'':
                # flush until new connection
                _, msg = socket.recv_multipart()
            raise BadRequest("Only support specified-length requests")

        socket.send_multipart([request_id, b'HTTP/1.1 100 (Continue)\r\n\r\n'], zmq.SNDMORE)

        content_length = int(headers['Content-Length'])
        print("Waiting to receive %ikB body" )
        while sum(len(chunk) for chunk in chunks) < content_length:
            id_, msg = socket.recv_multipart()
            if msg == b'':
                raise ConnectionLost("Disconnected")
            if id_ != request_id:
                raise ConnectionLost("Received data from wrong ID: %s != %s" % (id_, request_id))
            chunks.append(msg)
    return b''.join(chunks)


print(zmq.__version__, zmq.zmq_version())


socket = zmq.Context().socket(zmq.STREAM)
socket.bind("tcp://*:5555")


while True:
    # Get HTTP request
    request_id, msg = socket.recv_multipart()
    if msg == b'':
        continue
    chunks = []
    try:
        request, first_chunk = msg.split(b'\r\n\r\n', 1)
        if first_chunk:
            chunks.append(first_chunk)
        verb, path, headers = parse_request(request)
        print(verb, path)
        print("Headers:")
        for key, value in headers.items():
            print('  %s: %s' % (key, value))
        body = recv_body(socket, headers, chunks, request_id)
        print("Body: %r" % body)
    except BadRequest as e:
        print("Bad Request: %s" % e)
    except ConnectionLost as e:
        print("Connection Lost: %s" % e)
    except Exception:
        print("Failed to handle request", msg)
        print_exc()
        socket.send_multipart([
            request_id, b'HTTP/1.1 500 (OK)\r\n\r\n',
            request_id, b''])
    else:
        socket.send_multipart([
            request_id, b'HTTP/1.1 200 (OK)\r\n\r\n',
            request_id, b''])

The relevant logic for this case is in the recv_body method, which reads the headers and continues to recv chunks of the body until done.

Frankly, I don't think it makes a lot of sense to write an HTTP server in Python using ZMQ_STREAM. You can integrate zmq sockets with existing Python eventloops and re-use already established HTTP libraries, so you don't have to deal with re-inventing this particular wheel. For instance, pyzmq plays especially nicely with the tornado eventloop, and you can use zmq sockets and tornado http handlers together in the same application.

Question:

Before asking this question, I did my best by reading severel questions on SO (tagged Ratchet and dealing with similar issues but to no avail. I even asked a question which received no attention and I therefore deleted it to write another one (that hopefully is more clear).

My final goal is to build a one-to-one private chat application using Ratchet. Everything is working fine except that I can't send message to a specific user.

Every logged in user connects to the websocket server while accessing secured area of website:

$(document).ready(function() { 

    var conn = new WebSocket('ws://localhost:8080');
        conn.onopen = function(e) {
            console.log("Connection established!");

            // Here I need to send the logged in user_id to websocket server
            // and get it in onOpen method so that I can index my array 
            // of connections with user_id instead of
            //$connection->ResourceId, I explain more below

        };

        conn.onmessage = function(e) {
            console.log(e.data);
        };

});

When a user writes a message in the chat box, the message is sent via AJAX to web server then pushed to Websocket using ZeroMQ. In the controller:

// Persistence of Message(message_id, sender_id, receiver_id, message_text)
                .....

                $context = new \ZMQContext();
                $socket = $context->getSocket(\ZMQ::SOCKET_PUSH, 'my pusher');
                $socket->connect("tcp://localhost:5555");

                $pushData = array(
                       'receiver_id' => $receiver_id,
                       'sender_id'  => $user->getId(),
                       'message'  => $message->getMessageText(),
                    );
                $socket->send(json_encode($pushData));

So at the end, my websocket server is able to know which is the id of receiver using the JSON. But how will he know which is the connection of that user? In other words, I need to store websocket connections in an array that is indexed by the user id.

<?php
namespace RealTime;

use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;

class Pusher implements WampServerInterface, MessageComponentInterface{

    private $clients;

    public function onOpen(ConnectionInterface $conn) {

        $this->clients[$conn->resourceId] = $conn;
        // I need here to get the user_id received from browser while opening connection
    }

    public function onMessageEntry($entry) {
        $entryData = json_decode($entry, true);

        //This is not what I need (It sends to all users in array)
        foreach ($this->clients as $key => $client) {

        $client->send($entryData['message']); 
        }
    }
    public function onMessage(ConnectionInterface $from, $msg) {
        echo $msg; 
    }
}

And the websocket server:

  <?php
        require dirname(__DIR__) . '/vendor/autoload.php';
        use RealTime\Pusher;

        $loop   = React\EventLoop\Factory::create();
        $pusher = new Pusher;

        $context = new React\ZMQ\Context($loop);
        $pull = $context->getSocket(ZMQ::SOCKET_PULL);
        $pull->bind('tcp://127.0.0.1:5555'); 
        $pull->on('message', array($pusher, 'onMessageEntry'));


        $webSock = new React\Socket\Server($loop);
        $webSock->listen(8080, '0.0.0.0'); 
        $webServer = new Ratchet\Server\IoServer(
            new Ratchet\Http\HttpServer(
                new Ratchet\WebSocket\WsServer(
                    new Ratchet\Wamp\WampServer(
                        $pusher
                    )
                )
            ),
            $webSock
        );
        $loop->run();

        ?>

Questions:

  1. How to send the logged in user_id from client side while opening connection.I need to have the value in websocket server so that I can index my array of clients with it ($client[user_id]=$conn instead of $client[recourceId]=$conn). I tried the javascript function send but I don't know where to receive the sent data (even onMessage is not printing anything).

  2. Why the onMessage method is not executing even MessageComponentInterface implemented (Is it because I have onMessageEntry method + $pull->on('message', array($pusher, 'onMessageEntry')); line of code?

Thank you.


Answer:

I was asked by "whiteletters in blankpapers" to contribute on this subject (sorry for being late).

Actually on my last try, I gave up on PHP WebSocket (it was so complicated to make this work) and started using SocketIO with nodeJS that solved my entire problem and could give me a functionnal simple Chat system.

Question:

I have a Dealer socket in client side, who is connected to Router socket in server side.

I often see Heartbeating mechanism : the server regularly send message to the client in order that client knows if he is correctly connect to the server, so the client can reconnect if he doesn't received message for some times.

For example the Paranoid Pirate pattern here : http://zguide.zeromq.org/page:chapter4

But after some tests : if the client loose the connection to the server for a moment and find it again, the client is automatically reconnected to the server socket (he receive sended message...).

I wonder in which case Heartbeating is necessary ?


Answer:

Heartbeating isn't necessary to keep the connection alive (there is a ZMQ_TCP_KEEPALIVE socket option for TCP sockets). Instead, heartbeating is required for both sides to know that the other side is still active. If either side does detect that the other is inactive, it can take alternative action.

Inactivity might be because a process has died, it's deadlocked, it's doing too much work between network activity, or network failure, etc. From the other sides point of view, all these scenarios are indistinguishable without more information.

In networking, making a design work is the easy part. The overwhelmingly hard part is dealing with failure. You have to consider as many possible failure modes as possible and deal with them in the design protocols. Heartbeating is often a helpful part in those protocols. They are far more useful than trying to work out if a socket is still up by use of monitor events, say.

Having said that, if your application doesn't need any particular level of reliability; perhaps you can just power cycle equipment when a failure happens. Then you probably don't need to worry about heartbeating. After all, there are plenty of patterns in the guide that don't use it. It's horses for courses.

Question:

I have a NodeJS API web server (let's call it WS1) that receives RESTful HTTP requests from clients, and to respond needs to first query another local server (let's call it WS2).

The flow is pretty much like this:

  1. WS1 receives an HTTP request from a client and parses it.
  2. WS1 sends a request to WS2 for some information.
  3. When WS1 receives the response from WS2, it finishes processing the original request and sends a response back to the client.

Until now all communication between WS1 and WS2 has been done through HTTP requests, since the two machines are on the same local network.

To speed things up though I'm considering to start using zmq instead. I've looked at the patterns they show on the docs, but still haven't figured out a concurrency problem.

WS1 can send many requests per second to WS2, and there's no guarantee that WS2 replies in the same order as it receives the requests, since some async operations can internally take longer than others.

So, using zmq with NodeJS, how do I make sure that when WS1 receives a message from WS2 it knows to what original client request it belongs to? Is there a built-in mechanism to take care of it?

Thanks!


Answer:

0MQ is an interesting tool set that helps abstract socket communication. There are mechanism (should you choose the correct socket types) that allow the server to respond to the right client, and it is handled within the confines of 0mq.

The basic API types are:

  1. PUSH-PULL
  2. PUB-SUB
  3. REQUEST-REPLY

IF you want to be able to have one machine respond to the originator, then I believe you want REQ-REP api type.

then you need to consider the multi-plexing on each side to get the connectors correct. But keep it one to one for simplicity sake at first:

Sample Client (from http://zguide.zeromq.org/js:rrclient

// Hello World client in Node.js
// Connects REQ socket to tcp://localhost:5559
// Sends "Hello" to server, expects "World" back

var zmq       = require('zmq')
  , requester = zmq.socket('req');

requester.connect('tcp://localhost:5559');
var replyNbr = 0;
requester.on('message', function(msg) {
  console.log('got reply', replyNbr, msg.toString());
  replyNbr += 1;
});

for (var i = 0; i < 10; ++i) {
  requester.send("Hello");
}

sample server (from http://zguide.zeromq.org/js:rrserver)

// Hello World server in Node.js
// Connects REP socket to tcp://*:5560
// Expects "Hello" from client, replies with "World"

var zmq = require('zmq')
  , responder = zmq.socket('rep');

responder.connect('tcp://localhost:5560');
responder.on('message', function(msg) {

  console.log('received request:', msg.toString());
  setTimeout(function() {
    responder.send("World");
  }, 1000);
});

The routing of the reply back to the client is handled automatically by 0MQ. it is part of the message (although I don't remember if you see the address buffer in these examples - it maybe abstracted away). Here is what the request envelope looks like:

it is the first frame, which allows 0MQ to be able to reply to the correct client.

Once that is running you can then consider 1..* *..1 and ... All it really does is require you to change the socket types to DEALER and ROUTER where appropriate.

Question:

I've been working on a distributed system project, my system is partially p2p. My issue is super simple, I don't know how people usually solve it, because I have no experience in this domain, I am very new to this all.

I want to communicate over the internet between two clients, which both have ZMQ sockets.

On my local network, or my machine, they seem to work fine, but when communicating over the internet, I never get my message. I have shortlisted this to 2 reasons :

1 ) The NAT - it is not letting my message reach the client host, does anyone know how to solve the issue of NAT within ZMQ, I have heard of TCP hole punching and such, how do web developers and other people who deal with this thing often manage this ?

2 ) ZMQ sockets can not communicate over the internet, even if the communication is strictly between the two ZMQ sockets and not BSD sockets etc. I am not sure about this one though.

If anyone has expertise in this area I would be grateful and it would help me move forward thanks !


Answer:

2 ) ZMQ sockets can not communicate over the internet, even if the communication is strictly between the two ZMQ sockets

Well, 2 ) is easy, ZeroMQ sockets obviously work over the Internet .

There is not much to add to this.

1 ) The NAT - it is not letting my message reach the client host,

The 1 ) will deserve a bit more attention :

Right, NAT could be in place, when local LAN(s) routers are connected to a single public ( registered, coordinated IPv4 / IPv6 address ).

Next, there could be another show-stopper in the game, the FireWall, put in one or more places (!) -- be it a local one ( O/S operated, which one can check, if having administrator-grade login to the localhost ), or a one, integrated into any of the Gateway/Proxy/Policy enforcement.

In any case, thorough design-review ought take place with your local administrator(s), responsible for the localhost-O/S and network-infrastructure elements and a network-wide responsible security-manager / engineer(s).

The "HOW ?" part :

This ( principal ) complexity is exactly why Game Studios try to avoid user's headaches on solving these complexities and try to provide some escape strategy.

For a trivial case 1-to-1 : One may indeed use a set of rules for a port-forwarding setup ( if FireWall + Gateway engineering permits ) and your ZeroMQ connectivity may get directed onto a tcp://<public-IP>:<_a_port#_known_to_be_FWDed_to_a_target_HOST_> address.

For more, but still units : This scenario may seem easy for a single connection setup, yet if in a need to have units or tens of target hosts, there would be a rather limited will from the gateway ( router ) / firewall admins to open more and more ports on the wild side of the security perimeter. Here another trick may help - using a standard ssh-tools, where connections could harness so called local-port-forwarding and remote-port-forwarding, and the interconnects enjoy a single-port to pass firewall + gateway, plus the content is encryption-protected. Sure, more administrative efforts are needed on both sides, yet, a known and smart way to go, if these efforts and a bit increased latency ( encryption + decryption processing added ) do not spoil your in-game UX latency plans.

For more, above a few units : There is an option of re-using an ad-hoc, yet a security threating dual-sided sword - a multi-player shared (!) VPN, which solves the global "visibility" issues ( most often with some central ( be it published or not ) service-provisioning mapping and authentication serving coordinator ). The localhost side applications simply start to see another "local"-interface, uncoordinated with respect to its IPv4 / IPv6 address(es), yet this private-shared-VPN seems to be joining all the players so as to look as if all of these share one common IP-network, having effectively bypassed all the security/firewalling efforts of the common network practices --- which is at the same time its strongest risk for putting in place ( not mentioning the risk of the principal SPOF in the remote, central, authorisation/mapping service-provider, be theirs set of motivations published or hidden ).

Gaming industry panicked on all these issues since Multi-Player games started to be sold ( which for 2018/Q2 Gamers may look as since ever ) and the industry was trying to avoid exactly all these complexity-related pains, as a dominant fraction of game-buying teenagers was not expected to also have acquired both the patient persistence ( to systematically craft the proper setup ) and the deep-enough knowledge ( so as to know all the system-wide details of where and what to setup or re-configure, so as to unlock a secure end-to-end online in-game visibility ).

For indeed many-to-many cases : A nice example was started in late 1990-ies / early 2000-ies when IL-2 Sturmovik's Community of Pilots and Virtual Squadrons went operational 24/7/365 "since ever". They used a Community sponsored HyperLobby a Mediating Node to have all these complexities solved once and forever for all interested Members. HyperLobby lightweight client/server service-infrastructure was doing all the port-forwarding mapping setups and other server-side mediations dirty hacks all invisible to Pilot(s) and provided added means for administering indeed many connected Multi-Player Game Theatres for IL-2, F/A-18, Su-27 Flanker, CFS, Medal Of Honor and many more, than I try to remember today ( topping above small tens of thousands of connected Pilots IIRC in peak hours ). A great piece of sound design and distinguished efforts for decades for the truly Global online Community ( having an honor to serve in pseudohistorical VFSQ, with members spanning 13 TimeZones - from Hawaii, Brasil, U.S., U.K., France, Germany, Italy, Greece to Turkey - flying with similarly minded friends from Japan, Australia, New Zealand and many other places, round the globe -- be it East/West Front + Pacific Theater globally coordinated weekend-events - missions reconstructed till the very historical details or enjoyed the Memorial Parade Flyovers on V-Day-s' anniversaries ) -- ~Salute~ Jiri Fojtasek, well done indeed! -- idea of which helps to illustrate the way to go / follow ( as many younger game-portals indeed did follow this path ).

Question:

Consider the following minimal example of a request-reply setup:

SERVER:

// Minimal example request & reply (REQ-REP)
// SERVER

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

int main (void) {
    //  Socket to talk to clients
    void *context = zmq_ctx_new ();
    void *responder = zmq_socket (context, ZMQ_REP);
    int rc = zmq_bind (responder, "tcp://*:5555");
    assert (rc == 0);

    while (1) {
         char buffer [2];
         zmq_recv (responder, buffer, 2, 0);
         printf ("Received: %s\n",buffer);
    }
    return 0;
}

CLIENT:

// Minimal example request & reply (REQ-REP)
// CLIENT

#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>

int main (void) {
    void *context = zmq_ctx_new ();
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5555");
    zmq_send (requester, "0", 2, 0);
    zmq_close (requester);
    zmq_ctx_destroy (context);
    return 0;
}

Results in the following output:

Received: 0
Received: 0
...

..and keeps punching this line in standard output forever.

What I would have expected:

Received: 0

Once.

Obviously, I got the idea of request-reply totally wrong.


(20 hours later)

I don't know why. But a back-to-back zmq_send() resolves the problem. The following (server) code snippet works as expected:

int main (void) {
    //  Socket to talk to clients
    void *context = zmq_ctx_new ();
    void *responder = zmq_socket (context, ZMQ_REP);
    int rc = zmq_bind (responder, "tcp://*:5555");
    assert (rc == 0);

    while (1) {
         char buffer [2];
         zmq_recv (responder, buffer, 2, 0);
         zmq_send (responder, "ACK", 3, 0);
         printf ("Received: %s\n",buffer);
    }
    return 0;
}

Answer:

This makes sense: a REP socket receives and sends, and a REQ socket sends and receives.

You should try a PUSH socket to send and a PULL socket to receive, in your example.

Question:

Lets say I have a very simple Client/Server model, using REQ/REP from ZeroMQ. See python code below.

In the code below the client will wait forever, but I want the client to give up (lets say after 20 seconds) and move on with its life if it doesn't get a response. The server could be down, the router unplugged, the WiFi is not working. I really don't or should care why.

Then at a later time, I'll have the client try again and it could be a completely different request.

But I fear I'll cross an old request, get things out of order, cause more problems.

Does anyone know how to do this gracefully? I've been racking my brain on a simple solution.

SIMPLE CLIENT CODE

#!/usr/bin/env python3

import zmq
from time import sleep

#                                          CREATE SOCKET - Client (USING zmq.REQ)
my_client_context = zmq.Context()
my_client_socket = my_client_context.socket(zmq.REQ)
my_client_socket.connect('tcp://127.0.0.1:5557')

#                                          [REQ]uest AND [REP]ly
to_server = b"Hi"
my_client_socket.send(to_server)
from_server = my_client_socket.recv()
print(from_server)

sleep(2)

#                                          REQuest AND REPort
to_server = b"blah"
my_client_socket.send(to_server)
from_server = my_client_socket.recv()
print(from_server)

SIMPLE SERVER CODE

#!/usr/bin/env python3

import zmq

#                                         CREATE SOCKET - Server (USING zmq.REP)
my_server_context = zmq.Context()
my_server_socket = my_server_context.socket(zmq.REP)
my_server_socket.bind('tcp://127.0.0.1:5557')

#                                         LISTEN ON SOCKET
while True:
    msg = my_server_socket.recv()
    if msg == b'Hi':
        to_client = b"Well hello to you"
        my_server_socket.send(to_client)
    else:
        to_client = b"Not sure what you want"
        my_server_socket.send(to_client)

Answer:

ZeroMQ supports .poll() to non-blocking test before .recv()

One can use .poll()

.poll( timeout = None, flags = zmq.POLLIN ) # poll the socket for events

The default is to poll forever for incoming events. Timeout is in milliseconds, if specified.

Parameters:

timeout : int [default: None]

The timeout ( in milliseconds ) to wait for an event. If unspecified (or specified None), will wait forever for an event.

flags : bitfield (int) [default: POLLIN]

The event flags to poll for ( any combination of POLLIN | POLLOUT ). The default is to check for incoming events ( POLLIN ).

Returns:

events : bitfield (int)

The events that are ready and waiting. Will be 0 if no events were ready by the time timeout was reached.

ZeroMQ supports non-blocking, asynchronous mode for .recv()

so may build one's own, non-blocking, soft-RT-tuned .recv() busy loop.

while not_SIG_KILL_yet:     # main-<LOOP> -<o>-<o>-<o>-<o>-<o>-<o>-<o>-<o>-<o>-
    try:                                         # TRY: an-outer-most-<ExceptionWRAPPER> for KeyboardInterrupt
         ''' ............................................................ 250 msec sample-rate <loop>-task ____________________________________________________________________________'''
         try:
             maybeRECV = my_client_socket.recv( zmq.NOBLOCK )

             # Handle .recv() data

         except:
             # Handle ZMQError EAGAIN

             # .INC failed attempts COUNTER

             # .IF >
             if ( COUNTER > aTresholdToGiveUp ):
                not_SIG_KILL_yet = False
                continue

             # GIVE CPU-a-NAP -------------------- may be segmented + EXC-handler

             # ------------------------------------------------------------------
    except KeyboardInterrupt:
        not_SIG_KILL_yet = False

    pass
    # <EoW>-----------------# main-<LOOP> -<o>-<o>-<o>-<o>-<o>-<o>-<o>-<o>-<o>-
ZeroMQ works on Archetype Pattern, not on "dumb"-socket

Thus being afraid to meet "old"-[REP]-answer ( still hanging ( and it must hang there, mustn't it? ) on the SERVER-side in the internal Queue ) is correct as the REQ/REP-pattern is exactly doing that by-definition.

The CLIENT side has the right to gracefully close the associated resources and send SERVER an indication of clearing the circus.

ZeroMQ support very dynamic set-up/tear-downs of the ground elements and it is a fair manner not to leave communication-counterparty ( SERVER in this case ) in any doubts about what the communication-peer intended to do.

Read details about:

my_client_context.setsockopt( zmq.LINGER, 0 ) # do not wait for anything
my_client_socket.close()                      # statefull close
my_client_context.term()                      # graceful termination / release of resources ( no MEM leaks )

Question:

I'm working with Pupil Labs, a huge open source for eye/pupil tracking. The entire code is written in Python. The so-called Pupil Remote is based on ZeroMQ.

If I start running the Filter Messages everything is fine. For my purposes I need to "translate" Filter Messages into Java because I created an Android app, which should call a client, which has the purpose to serve as the python client.

Here's what I've done so far:

import android.annotation.SuppressLint;
import org.zeromq.ZMQ;
import java.nio.charset.Charset;
import static java.lang.Thread.sleep;

public class ZeroMQClient {
    @SuppressLint("NewApi")
    public static void requestGazeData() {

        ZMQ.Context context = ZMQ.context(1);
        ZMQ.Socket subscriber = context.socket(ZMQ.SUB);

        System.out.println("Connecting to server...");

        subscriber.connect("tcp://xxx.x.x.x:50020");

        System.out.println("Connected");

        String gaze = "gaze";
        subscriber.subscribe(gaze.getBytes(Charset.forName("UTF-8")));

        while (true) {
            String msg = subscriber.recvStr();
            System.out.println(msg);

            subscriber.close();
            context.term();
        }
    }
}

Now as you can expect, why I'm asking you, nothing happens, I don't receive any data from the Pupil Labs server. I oriented myself on this post, but unfortunately, it didn't work out for me. Also the IP-Address and port are the same as on the server. It works neither locally nor remotely.

Happy about any answer, since I stuck at this.


Answer:

Due to the correct establishment in terms of my implementation the actual issue was the firewall, which just blocked the connection. By posting my solution I'm hopefully able to help future visitors of this question.

Question:

Given 2 applications where application A is using a publisher client to contentiously stream data to application B which has a sub server socket to accept that data, how can we configure pub client socket in application A such that when B is being unavailable (like its being redeployed, restarted) A buffers all the pending messages and when B becomes available buffered messages go trough and socket catches up with real time stream?

In a nutshell, how do we make PUB CLIENT socket buffer messages with some limit while SUB SERVER is unavailable?

The default behaviour for PUB client is to drop in mute state, but it would be great if we could change that to a limit sized buffer, is it possible with zmq? or do i need to do it on application level...

I've tried setting HWM and LINGER in my sockets, but if i'm not wrong they are only responsible for slow consumer case, where my publisher is connected to subscriber, but subscriber is so slow that publisher starts to buffer messages (hwm will limit number of those messages)...

I'm using jeromq since i'm targeting jvm platform.


Answer:

First of all, welcome to the world of Zen-of-Zero, where latency matters most

PROLOGUE :

ZeroMQ was designed by a Pieter HINTJENS' team of ultimately experienced masters - Martin SUSTRIK to be named first. The design was professionally crafted so as to avoid any unnecessary latency. So asking about having a (limited) persistence? No, sir, not confirmed - PUB/SUB Scalable Formal Communication Pattern Archetype will not have it built-in, right because of the added problems and decreased performance and scalability ( add-on latency, add-on processing, add-on memory-management ).

If one needs a (limited) persistence (for absent remote-SUB-side agent(s)' connections ), feel free to implement it on the app-side, or one may design and implement a new ZMTP-compliant such behaviour-pattern Archetype, extending the ZeroMQ framework, if such work goes into stable and publicly accepted state, but do not request the high-performance, latency-shaved standard PUB/SUB having polished the almost linear scalability ad astra, to get modified in this direction. It is definitely not a way to go.

Solution ?

App-side may easily implement your added logic, using dual-pointer circular buffers, working in a sort-of (app-side-managed)-Persistence-PROXY, yet in-front-of the PUB-sender.

Your design may get successful in squeezing some additional sauce from the ZeroMQ internal details in case your design also enjoys to use the recently made available built-in ZeroMQ-socket_monitor-component to setup an additional control-layer and receive there a stream of events as seen from "inside" the PUB-side Context-instance, where some additional network and connection-management related events may bring more light into your (app-side-managed)-Persistence-PROXY

Yet, be warned that

The _zmq_socket_monitor()_ method supports only connection-oriented transports, that is, TCP, IPC, and TIPC.

so one may straight forget about this in case any of the ultimately interesting transport-classes was planned to be used { inproc:// | norm:// | pgm:// | epgm:// | vmci:// }


Heads up !

There are inaccurate, if not wrong, pieces of information from our Community honorable member smac89, who tried his best to address your additional interest expressed in the comment:

"...zmq optimizes publishing on topics? like if you keep publishing on some 100char long topic rapidly, is it actually sending the topic every time or it maps to some int and sends the int subsequently...?"

telling you:

"It will always publish the topic. When I use the pub-sub pattern, I usually publish the topic first and then the actual message, so in the subscriber I just read the first frame and ignore it and then read the actual message"

ZeroMQ does not work this way. There is nothing as a "separate" <topic> followed by a <message-body>, but rather the opposite

The TOPIC and the mechanisation of topic-filtering works in a very different way.

1) you never know, who .connect()-s:i.e. one can be almost sure the version 2.x till version 4.2+ will handle the topic-filtering in different manner ( ZMTP:RFC defines intial capability-version handshaking, to let the Context-instance decide, which version of topic-filtering will have to be used: ver 2.x used to move all messages to all peers, and let all the SUB-sides ( of ver 2.x+ ) be delivered the message ( and let the SUB-side Context-instance process the local topic-list filter processing )whereasver 4.2+ are sure to perform the topic-list filter processing on **the PUB-side Context-instance (CPU-usage grows, network-transport the opposite ), so your SUB-side will never be delivered a byte of "useless" read "not-subscribed" to messages.

2) (you may, but) there is no need to separate a "topic" into a first-frame of a thus-implied multi-frame message. Perhaps just the opposite ( it is a rather anti-pattern to do this in high performance, low-latecy distributed system design.

Topic filtering process is defined and works byte-wise, from left-to-right, pattern matching for each of the topic-list member value agains the delivered message payload.

Adding extra data, extra frame-management processing just and only does increase the end-to-end latency and processing overhead. Never a good idea to do this instead of proper distributed-system design work.


EPILOGUE :

There are no easy wins nor any low-hanging fruit in professional distributed-systems design, the less if low-latency or ultra-low-latency are the design targets.

On the other hand, be sure that ZeroMQ framework was made with this in mind and these efforts were crowned with stable, ultimately performant well-balanced set of tools for smart (by design), fast (in operation) and scalable (as hell may envy) signaling/messaging services people love to use right because of this design wisdom.

Wish you live happy with ZeroMQ as it is and feel free to add any additional set of features "in front" of the ZeroMQ layer, inside your application suite of choice.

Question:

I want to create a proxy server which routes incoming packets from REQ type sockets to one of the REP sockets on one of the computers in a cluster. I have been reading the guide and I think the proper structure is a combination of ROUTER and DEALER on the proxy server. Where the ROUTER passes messages to the dealer to be distributed. However, I cannot figure out how to create this connection scheme. Is this the correct architecture? If so how to I bind a dealer to multiple addresses. The flow I envision is like this REQ->ROUTER|DEALER->[REP, REP, ...] where only one REP socket would handle a single request.


Answer:

NB: forget about packets -- think in terms of "Behaviour", that's the key

ZeroMQ is rather an abstract layer for certain communication-behavioral patterns, so while terms alike socket do sound similar to what one has read/used previously, the ZeroMQ-world is by far different from many points of view.

This very formalism allows ZeroMQ Formal-Communication-Patterns to grow in scale, to get assembled in higher-order-patterns ( for load-balancing, for fault-tolerance, for performance-scaling ). Mastering this style of thinkign, you forget about packets, thread-sync-issues, I/O-polling and focus on your higher-abstraction-based design -- on Behaviour -- rather than on underlying details. This makes your design both free from re-inventing wheel & very powerful, as you re-use a highly professional tools right for your problem-domain tasks.

DEALER->[REP,REP,...] Segment

That said, your DEALER-node ( in fact a ZMQsocket-access-node, having The Behaviour called a "DEALER" to resemble it's queue/buffering-style, it's round-robin dispatcher, it's send-out&expect-answer-in model ) may .bind() to multiple localhost address:port-s and these "service-points" may also operate over different TransportClass-es -- one working over tcp://, another over inproc://, if that makes sense for your Design Architecture -- ZeroMQ empowers you to use this transparently abstracted from all the "awfull&dangerous" lower level gritty-nitties.

ZeroMQ also allows to reverse .connect() / .bind()

In principle, where helpfull, one may reverse the .bind() and .connect() from DEALER to a known target address of the respective REP entity.

Question:

Multiple clients are connected to a single ZMQ_PUSH socket. When a client is powered off unexpectedly, server does not get an alert and keep sending messages to it. Despite of using ZMQ_OBLOCK and setting ZMQ_HWM to 5 (queue only 5 messages at max), my server doesn't get an error until unless client is reconnected and all the messages in queue are received at once.


Answer:

I recently ran into a similar problem when using ZMQ. We would cut power to interconnected systems, and the subscriber would be unable to reconnect automatically. It turns out the there has recently (past year or so) been implemented a heartbeat mechanism over ZMTP, the underlying protocol used by ZMQ sockets.

If you are using ZMQ version 4.2.0 or greater, look into setting the ZMQ_HEARTBEAT_IVL and ZMQ_HEARTBEAT_TIMEOUT socket options (http://api.zeromq.org/4-2:zmq-setsockopt). These will set the interval between heartbeats (ZMQ_HEARTBEAT_IVL) and how long to wait for the reply until closing the connection (ZMQ_HEARTBEAT_TIMEOUT).

EDIT: You must set these socket options before connecting.

Question:

I am trying to implement client server using ZeroMQ.

I am running a server in an infinite loop, bound to a socket and polling the the socket infinitely.

When a client sends a request, the server receives only for the first time. The subsequent requests are not received by the server, below is my code snippet

Server :

        ZMQ.Socket socket = context.socket(ZMQ.REP);
        socket.bind ("tcp://*:5555");
        System.out.println("Server is in receive mode");
            while (!Thread.currentThread ().isInterrupted ()) {           
                Poller poller = new Poller(1);
                poller.register(socket, Poller.POLLIN);
                poller.poll();
                if (poller.pollin(0)) {
                    ZMsg zmqMessage = ZMsg.recvMsg(socket);                 
                    if (zmqMessage!=null) {
                        zmqMessage.getFirst().getData();
                    }
                }

Client :

    ZMQ.Socket socket = context.socket(ZMQ.REQ);
    socket.connect ("tcp://localhost:5555");
    ZMsg readyFrame = new ZMsg();
    readyFrame.add(new ZFrame("READY"));
    readyFrame.send(socket);

I tried poll out in client side like below but it did not work.

        Poller poller = new Poller(1);
        poller.register(socket, Poller.POLLOUT);
        poller.pollout(0);

Answer:

ZeroMQ is a wonderfull piece of art from Pieter HINTJENS' and Martin SUSTRIK's team. Yes, there are some low-level, technology-related issues, that still require some design efforts, nevertheless the ZeroMQ is stable and very mature.

System thinking - the toys are working as distributed automata

Yes, normal work-flow of SEQ programming languages, that work serially ( or "just"-concurrent ) suddenly gets new dimension - a distributed automata dimension.

So the local workflow is dependent on externally operated parties.

This is the case for each of the ZeroMQ Formal Communication Patterns' primitive archetypes bear human-familiar names:

one REQ-ests, second REP-lies one PUB-lishes, anyone SUB-scribes to just listen one PUSH-es, the other PULL-s to receive each party, bound together in PAIR may both speak and listen, whenever, as needed etc for BROKER, DEALER, XPUB, XSUB, et al

This is the by-design reason, why your server-side REQ-archetype behaviour will not receive any next message from any other party ( yes, there might be more clients connected to the same REQ-transport-class node ), until it indeed REP-lies ( be it an empty message or not ) to the REP-side of the distributed automata.

The best next step

Well, the best next step one may ever do in going professional in this direction is IMHO to get a bit more global view, which may sound complicated for the first few things one tries to code with ZeroMQ, but if you at least jump to the page 265 of the [Code Connected, Volume 1] [available asPdf >>> http://hintjens.wdfiles.com/local--files/main%3Afiles/cc1pe.pdf ], if it were not the case of reading step-by-step thereto.

The fastest-ever learning-curve would be to have first an un-exposed view on the Fig.60 Republishing Updates and Fig.62 HA Clone Server pair for a possible High-availability approach and then go back to the roots, elements and details.

Overheads:

As a minor note, it would be fair and resource-wise to lower the processing overheads once the Poller would be created "outside" the while(){}-loop, as there is no visible reason for reinstating such element and re-register it's services for each loop again and again:

Poller poller      = new Poller(1);                  // new INSTANCE
poller.register( socket, Poller.POLLIN );            // pay COSTS of SETUP
                                                     //     JUST ONCE, HERE
while ( !Thread.currentThread ().isInterrupted () ) {// inf LOOP

    poller.poll();                                   //     POLL !BLOCKING
    if ( poller.pollin( 0 ) ) {                      //     if ITEMS{ ... proc 'em }
    ZMsg zmqMessage  = ZMsg.recvMsg( socket );
    if ( zmqMessage != null )
         zmqMessage.getFirst().getData();
    }
}
Anyway: Enjoy the worlds of distributed computing!

Question:

I am using NetMQ to send messages from many clients to a server which receives the messages and processes them. If I use the pub/sub pattern, I can (ab)use the subscriber socket and use it for the server and clients will be the publishers (the opposite flow, isn't it?). However, I cannot send an acknowledge message to the clients.

If I use the REQ/RESP pattern, I can use the response socket for the server and even send an acknowledge message which can be received by the client. However, if the server misses the message from client, I cannot resend the message (after waiting for a timespan), because I receive an exception

Req.XSend - cannot send another request

-- even after disconnecting the client socket and closing it (it seams that the server socket keeps track of it).

And finally, based on the documentation, I'm not sure if the Dealer/Router is the right pattern to use. As I'm pretty sure that I'm stuck in a very common situation, is there any pattern to implement this scenario using NetMQ?


Answer:

You are stuck in a very common problem, typically called "Reliable Request-Reply" in 0MQ parlance.

There are several approaches described in the guide, the first is the "lazy pirate". In it you use a REQ/RESP socket pair as described (though REQ/ROUTER would work just as well, and probably better for more than one client). The solution is to dispose of the socket if your ACK times out and create a new one.

The guide does note that swapping a DEALER for the REQ would solve the problem, but adds having to keep track of the envelope yourself. Unless you need async send/receive on the client side for some other reason, I would follow their advice and stick with REQ/ROUTER.

A sample implementation of this pattern can be found on GitHub: https://github.com/NetMQ/Samples/tree/master/src/Pirate%20Pattern

Question:

I'm trying to create an interface in javascript to talk to and control robots (written in C) using zeromq. I'm new to zeromq and have great difficulty in finding a library to use zeromq in javascript, while a good library exist in numerous other languages. The two small libraries I have found use a workaround using flash or web sockets which makes it a lot more complicated. Both are 'in beta' and abandoned.

The reason we're using html/javascript is because it can be used on many platforms, but it seems like javascript is not made for this kind of job? I feel that I'm doing someone wrong since I find so little information about this. I'm told to use CZMQ and Zyre, but those can't be used with javascript as far as I'm aware? Any help or redirect towards a guide would be appreciated, I'm way in over my head and am making little process.


Answer:

If I understand your question, you've written an HTML file that you open locally in your browser, and you want your browser to communicate directly to your robot using ZMQ.

The browser only has two mechanisms with which to connect to an external device or service: HTTP (normal links, forms, AJAX) and web sockets. ZMQ would have to live on top of web sockets. There may be a way to get that working, but it's probably not your ideal set up. Typically you would instead set up a separate web server that serves up your web page. Then you use websockets, ajax, or normal http calls to connect and send info to your webserver, and you use your language of choice to connect to your bot through ZMQ.

If you use that method, then you can indeed use javascript with ZMQ on the server side, using node.js. You can also use any other server side language that you choose to do the same thing, most of them have ZMQ bindings.

Your other option, as you've seen, is to use HTTP or web sockets to connect directly to your bot. Either one would work fine, depending on the specifics of your bot, but it wouldn't be my first choice.

Question:

I have 3 programs written in Python, which need to be connected. 2 programs X and Y gather some information, which are sent by them to program Z. Program Z analyzes the data and send to program X and Y some decisions. Number of programs similar to X and Y will be expanded in the future. Initially I used named pipe to allow communication from X, Y to Z. But as you can see, I need bidirectional relation. My boss told me to use ZeroMQ. I have just found pattern for my use case, which is called Asynchronous Client/Server. Please see code from ZMQ book (http://zguide.zeromq.org/py:all) below.

The problem is my boss does not want to use any threads, forks etc. I moved client and server tasks to separate programs, but I am not sure what to do with ServerWorker class. Can this be somehow used without threads? Also, I am wondering, how to establish optimal workers amount.

import zmq
import sys
import threading
import time
from random import randint, random

__author__ = "Felipe Cruz <felipecruz@loogica.net>"
__license__ = "MIT/X11"

def tprint(msg):
    """like print, but won't get newlines confused with multiple threads"""
    sys.stdout.write(msg + '\n')
    sys.stdout.flush()

class ClientTask(threading.Thread):
    """ClientTask"""
    def __init__(self, id):
        self.id = id
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        socket = context.socket(zmq.DEALER)
        identity = u'worker-%d' % self.id
        socket.identity = identity.encode('ascii')
        socket.connect('tcp://localhost:5570')
        print('Client %s started' % (identity))
        poll = zmq.Poller()
        poll.register(socket, zmq.POLLIN)
        reqs = 0
        while True:
            reqs = reqs + 1
            print('Req #%d sent..' % (reqs))
            socket.send_string(u'request #%d' % (reqs))
            for i in range(5):
                sockets = dict(poll.poll(1000))
                if socket in sockets:
                    msg = socket.recv()
                    tprint('Client %s received: %s' % (identity, msg))

        socket.close()
        context.term()

class ServerTask(threading.Thread):
    """ServerTask"""
    def __init__(self):
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        frontend = context.socket(zmq.ROUTER)
        frontend.bind('tcp://*:5570')

        backend = context.socket(zmq.DEALER)
        backend.bind('inproc://backend')

        workers = []
        for i in range(5):
            worker = ServerWorker(context)
            worker.start()
            workers.append(worker)

        poll = zmq.Poller()
        poll.register(frontend, zmq.POLLIN)
        poll.register(backend,  zmq.POLLIN)

        while True:
            sockets = dict(poll.poll())
            if frontend in sockets:
                ident, msg = frontend.recv_multipart()
                tprint('Server received %s id %s' % (msg, ident))
                backend.send_multipart([ident, msg])
            if backend in sockets:
                ident, msg = backend.recv_multipart()
                tprint('Sending to frontend %s id %s' % (msg, ident))
                frontend.send_multipart([ident, msg])

        frontend.close()
        backend.close()
        context.term()

class ServerWorker(threading.Thread):
    """ServerWorker"""
    def __init__(self, context):
        threading.Thread.__init__ (self)
        self.context = context

    def run(self):
        worker = self.context.socket(zmq.DEALER)
        worker.connect('inproc://backend')
        tprint('Worker started')
        while True:
            ident, msg = worker.recv_multipart()
            tprint('Worker received %s from %s' % (msg, ident))
            replies = randint(0,4)
            for i in range(replies):
                time.sleep(1. / (randint(1,10)))
                worker.send_multipart([ident, msg])

        worker.close()

def main():
    """main function"""
    server = ServerTask()
    server.start()
    for i in range(3):
        client = ClientTask(i)
        client.start()

    server.join()

if __name__ == "__main__":
    main()

Answer:

So, you grabbed the code from here: Asynchronous Client/Server Pattern

Pay close attention to the images that show you the model this code is targeted to. In particular, look at "Figure 38 - Detail of Asynchronous Server". The ServerWorker class is spinning up 5 "Worker" nodes. In the code, those nodes are threads, but you could make them completely separate programs. In that case, your server program (probably) wouldn't be responsible for spinning them up, they'd spin up separately and just communicate to your server that they are ready to receive work.

You'll see this often in ZMQ examples, a multi-node topology mimicked in threads in a single executable. It's just to make reading the whole thing easy, it's not always intended to be used that way.

For your particular case, it could make sense to have the workers be threads or to break them out into separate programs... but if it's a business requirement from your boss, then just break them out into separate programs.

Of course, to answer your second question, there's no way to know how many workers would be optimal without understanding the work load they'll be performing and how quickly they'll need to respond... your goal is to have the worker complete the work faster than new work is received. There's a fair chance, in many cases, that that can be accomplished with a single worker. If so, you can have your server itself be the worker, and just skip the entire "worker tier" of the architecture. You should start there, for the sake of simplicity, and just do some load testing to see if it will actually cope with your workload effectively. If not, get a sense of how long it takes to complete a task, and how quickly tasks are coming in. Let's say a worker can complete a task in 15 seconds. That's 4 tasks a minute. If tasks are coming in 5 tasks a minute, you need 2 workers, and you'll have a little headroom to grow. If things are wildly variable, then you'll have to make a decision about resources vs. reliability.

Before you get too much farther down the trail, make sure you read Chapter 4, Reliable Request/Reply Patterns, it will provide some insight for handling exceptions, and might give you a better pattern to follow.

Question:

Is there any way to have an asynchronous client and server in ZeroMQ, using the same TCP-port and many sockets?

I already tried the ROUTER/ROUTER pattern, but with no luck.

The plan is to have an asymmetric connection with send and receive patterns among processors. So a Processor-entity will be a Client and also be a Server at the same time.


Answer:

Is there any way to have an asynchronous client and server in ZeroMQ, using the same TCP-port and many sockets ?

Yes, there is.

As a preventive step, in other words, before getting into troubles, best review the main conceptual differences in [ ZeroMQ hierarchy in less than a five seconds ] or other posts and discussions here.

The Yes above means, one-.bind()-many-.connect()-s, which composition still uses a just one <transport-class>://<a-class-specific-address>, that for a tcp:// transport-class on IPv4 means that one tcp://A.B.C.D:port# occupied for this whole 1:MANY-sockets-composition.

For obvious reasons, more complex compositions, like many-.bind()-s-many-.connect()-s, are possible, where feasible, given both the ZeroMQ infrastructure topology options and also socket-"in-band"-message-routing features are thus setup and used for smart-decisions on actual message-flow mechanics.

Question:

I am testing the examples of the ZeroMQ framework asynchronous library messaging to work in distributed applications, enabling the interoperability between programming languages.

My interest is that from a client application in C++ I can send a message and this is received in server application in python.

In order to this objective, I am using the following samples:

The client application in C++ is named hwclient.cpp and is based in this code:

#include <zmq.hpp>
#include <string>
#include <iostream>

int main ()
{
    //  Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REQ);

    std::cout << "Connecting to hello world server…" << std::endl;
    socket.connect ("tcp://localhost:5555");

    //  Do 10 requests, waiting each time for a response
    for (int request_nbr = 0; request_nbr != 10; request_nbr++) {
        zmq::message_t request (5);
        memcpy (request.data (), "Boti", 5);
        std::cout << "Sending " << request_nbr << "…" << std::endl;
        socket.send (request);

        //  Get the reply.
        zmq::message_t reply;
        socket.recv (&reply);

        // Print the reply, memory address ...
        std::cout << 'Mostrando reply' << &reply << std::endl;

        std::cout << "Received from server " << request_nbr << std::endl; reply;
    }
    return 0;
}

The python server is named hwserver.py and is based in this code:

import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received request: %s" % message)

    print('hi')

    #  Do some 'work'
    time.sleep(1)

    #  Send reply back to client
    socket.send(b"World")
    print('Response sent')

I build the C++ client with g++ hwclient.cpp -o client.out -lzmq and execute the binary to send a message from client of this way:

bgarcial@elpug : ~/CLionProjects/ZeroMQ
[0] % ./client.out
Connecting to hello world server…
Sending 0…
17018666170x7ffcbea254d0
Received from server 0
Sending 1…
17018666170x7ffcbea254d0
Received from server 1
Sending 2…
17018666170x7ffcbea254d0
Received from server 2

bgarcial@elpug : ~/CLionProjects/ZeroMQ
[0] % 

In the server side, the Boti string has been arrived!

[127] % python HelloWorldServer/zeromq_server.py
Received request: b'Boti\x00'
hi
Response sent
Received request: b'Boti\x00'
hi
Response sent
Received request: b'Boti\x00'
hi
Response sent

But I think that the code section socket.send(b"World") in the server is not executed because in my client does not arrive the World string ...

Is it possible that I am not receiving the reply in my c++ client of a suited way?

When I test the python server with a python client server, the reply from server arrives at client in a successful way ...

Why I cannot see the reply from server in my c++ application client?


Answer:

A) Assumptions about a not delivered message are not correct. Why?

The core logic ( actually the core, multiparty distributed-logic ) of the REQ/REP Scalable Formal Communication Pattern archetype is hard-wired so that if the REP-entity receives a REQ-message ( and never else , except with some recent API v4.+ delicate tweaking, to be utmost precise ), the REP-entity can ( and in your code will ) .send() a message to REQ-side.

Symmetrically, after the REQ-entity has fired its first message to REP-side, it can never send another "next" message, unless it has prior to that, .recv()-ed the REP-side response.

This symmetrical distributed-logic means, that if your server-side code reports positive acknowledgement of sending more than just one message, it on its own grounds itself represents a positive proof, that also the opposite side has been delivered each and every response from the server. ( For cases, where some messages got lost, you may kindly read my other posts on the risks of REQ/REP mutual dead-locks ).

So this has been solved as indeed an incorrect assumption, Q.E.D.


B) Assumptions about an ill-delivered message content are not correct. Why?

ZeroMQ either delivers a completely correct message, or nothing at all.

This means, that in case a message has been both sent and delivered ( for the core distributed-logic reasoning kindly ref. above ), the only possible alternative is, that the message payload is correct, as no other option exists in ZeroMQ framework internalities.

Proof: change this:

 // Print the reply, memory address ...
    std::cout << 'Mostrando reply' << &reply << std::endl;

into this:

 // Print the reply
    std::cout << 'Mostrando reply was delivered' << std::endl;

and you will straight see, where the problem is -- it is not in the ZeroMQ tools, but the way, how your code tried to present the &reply into a std::out output.


C) Why I cannot see the reply from server?

Because you do not try to show it.

That simple.

In case you are still in doubts, may try to send it again back to the server and print it / match it there to see, both that it was delivered at all ( ref. A) above ) and that it was delivered correctly ( ref. B) above )

Q.E.D.