Hot questions for Using ZeroMQ in messaging

Question:

I am seeking Python 2.7 alternatives to ZeroMQ that are released under the BSD or MIT license. I am looking for something that supports request-reply and pub-sub messaging patterns. I can serialize the data myself if necessary. I found Twisted from Twisted Matrix Labs but it appears to require a blocking event loop, i.e. reactor.run(). I need a library that will run in the background and let my application check messages upon certain events. Are there any other alternatives?


Answer:

Give nanomsg, a ZeroMQ younger sister, a try - same father, same beauty
  • Yes, it is licensed under MIT/X11 license.
  • Yes, REQ/REP - allows to build clusters of stateless services to process user requests
  • Yes, PUB/SUB - distributes messages to large sets of interested subscribers
  • Has several Python bindings available

https://github.com/tonysimpson/nanomsg-python (recommended)

https://github.com/sdiehl/pynanomsg

https://github.com/djc/nnpy


Differences between nanomsg and ZeroMQ

( state as of 2014/11 v0.5-beta - courtesy nanomsg.org >>> a-click-thru to the original HyperDoc )

Licensing

nanomsg library is MIT-licensed. What it means is that, unlike with ZeroMQ, you can modify the source code and re-release it under a different license, as a proprietary product, etc. More reasoning about the licensing can be found here.

POSIX Compliance

ZeroMQ API, while modeled on BSD socket API, doesn't match the API fully. nanomsg aims for full POSIX compliance.

Sockets are represented as ints, not void pointers. Contexts, as known in ZeroMQ, don't exist in nanomsg. This means simpler API (sockets can be created in a single step) as well as the possibility of using the library for communication between different modules in a single process (think of plugins implemented in different languages speaking each to another). More discussion can be found here. Sending and receiving functions ( nn_send, nn_sendmsg, nn_recv and nn_recvmsg ) fully match POSIX syntax and semantics.

Implementation Language

The library is implemented in C instead of C++.

From user's point of view it means that there's no dependency on C++ runtime (libstdc++ or similar) which may be handy in constrained and embedded environments. From nanomsg developer's point of view it makes life easier. Number of memory allocations is drastically reduced as intrusive containers are used instead of C++ STL containers. The above also means less memory fragmentation, less cache misses, etc. More discussion on the C vs. C++ topic can be found here and here.

Pluggable Transports and Protocols

In ZeroMQ there was no formal API for plugging in new transports (think WebSockets, DCCP, SCTP) and new protocols (counterparts to REQ/REP, PUB/SUB, etc.) As a consequence there were no new transports added since 2008. No new protocols were implemented either. The formal internal transport API (see transport.h and protocol.h) are meant to mitigate the problem and serve as a base for creating and experimenting with new transports and protocols.

Please, be aware that the two APIs are still new and may experience some tweaking in the future to make them usable in wide variety of scenarios.

nanomsg implements a new SURVEY protocol. The idea is to send a message ("survey") to multiple peers and wait for responses from all of them. For more details check the article here. Also look here. In financial services it is quite common to use "deliver messages from anyone to everyone else" kind of messaging. To address this use case, there's a new BUS protocol implemented in nanomsg. Check the details here.

Threading Model

One of the big architectural blunders I've done in ZeroMQ is its threading model. Each individual object is managed exclusively by a single thread. That works well for async objects handled by worker threads, however, it becomes a trouble for objects managed by user threads. The thread may be used to do unrelated work for arbitrary time span, e.g. an hour, and during that time the object being managed by it is completely stuck. Some unfortunate consequences are: inability to implement request resending in REQ/REP protocol, PUB/SUB subscriptions not being applied while application is doing other work, and similar. In nanomsg the objects are not tightly bound to particular threads and thus these problems don't exist.

REQ socket in ZeroMQ cannot be really used in real-world environments, as they get stuck if message is lost due to service failure or similar. Users have to use XREQ instead and implement the request re-trying themselves. With nanomsg, the re-try functionality is built into REQ socket. In nanomsg, both REQ and REP support cancelling the ongoing processing. Simply send a new request without waiting for a reply (in the case of REQ socket) or grab a new request without replying to the previous one (in the case of REP socket). In ZeroMQ, due to its threading model, bind-first-then-connect-second scenario doesn't work for inproc transport. It is fixed in nanomsg. For similar reasons auto-reconnect doesn't work for inproc transport in ZeroMQ. This problem is fixed in nanomsg as well. Finally, nanomsg attempts to make nanomsg sockets thread-safe. While using a single socket from multiple threads in parallel is still discouraged, the way in which ZeroMQ sockets failed randomly in such circumstances proved to be painful and hard to debug.

State Machines

Internal interactions inside the nanomsg library are modeled as a set of state machines. The goal is to avoid the incomprehensible shutdown mechanism as seen in ZeroMQ and thus make the development of the library easier.

For more discussion see here and here.

IOCP Support

One of the long-standing problems in ZeroMQ was that internally it uses BSD socket API even on Windows platform where it is a second class citizen. Using IOCP instead, as appropriate, would require major rewrite of the codebase and thus, in spite of multiple attempts, was never implemented. IOCP is supposed to have better performance characteristics and, even more importantly, it allows to use additional transport mechanisms such as NamedPipes which are not accessible via BSD socket API. For these reasons nanomsg uses IOCP internally on Windows platforms.

Level-triggered Polling

One of the aspects of ZeroMQ that proved really confusing for users was the ability to integrate ZeroMQ sockets into an external event loops by using ZMQ_FD file descriptor. The main source of confusion was that the descriptor is edge-triggered, i.e. it signals only when there were no messages before and a new one arrived. nanomsg uses level-triggered file descriptors instead that simply signal when there's a message available irrespective of whether it was available in the past.

Routing Priorities

nanomsg implements priorities for outbound traffic. You may decide that messages are to be routed to a particular destination in preference, and fall back to an alternative destination only if the primary one is not available.

For more discussion see here.

TCP Transport Enhancements

There's a minor enhancement to TCP transport. When connecting, you can optionally specify the local interface to use for the connection, like this:

nn_connect (s, "tcp://eth0;192.168.0.111:5555").

Asynchronous DNS

DNS queries (e.g. converting hostnames to IP addresses) are done in asynchronous manner. In ZeroMQ such queries were done synchronously, which meant that when DNS was unavailable, the whole library, including the sockets that haven't used DNS, just hung.

Zero-Copy

While ZeroMQ offers a "zero-copy" API, it's not true zero-copy. Rather it's "zero-copy till the message gets to the kernel boundary". From that point on data is copied as with standard TCP. nanomsg, on the other hand, aims at supporting true zero-copy mechanisms such as RDMA (CPU bypass, direct memory-to-memory copying) and shmem (transfer of data between processes on the same box by using shared memory). The API entry points for zero-copy messaging are nn_allocmsg and nn_freemsg functions in combination with NN_MSG option passed to send/recv functions.

Efficient Subscription Matching

In ZeroMQ, simple tries are used to store and match PUB/SUB subscriptions. The subscription mechanism was intended for up to 10,000 subscriptions where simple trie works well. However, there are users who use as much as 150,000,000 subscriptions. In such cases there's a need for a more efficient data structure. Thus, nanomsg uses memory-efficient version of Patricia trie instead of simple trie.

For more details check this article.

Unified Buffer Model

ZeroMQ has a strange double-buffering behaviour. Both the outgoing and incoming data is stored in a message queue and in TCP's tx/rx buffers. What it means, for example, is that if you want to limit the amount of outgoing data, you have to set both ZMQ_SNDBUF and ZMQ_SNDHWM socket options. Given that there's no semantic difference between the two, nanomsg uses only TCP's (or equivalent's) buffers to store the data.

Scalability Protocols

Finally, on philosophical level, nanomsg aims at implementing different "scalability protocols" rather than being a generic networking library. Specifically:

Different protocols are fully separated, you cannot connect REQ socket to SUB socket or similar. Each protocol embodies a distributed algorithm with well-defined prerequisites (e.g. "the service has to be stateless" in case of REQ/REP) and guarantees (if REQ socket stays alive request will be ultimately processed). Partial failure is handled by the protocol, not by the user. In fact, it is transparent to the user. The specifications of the protocols are in /rfc subdirectory. The goal is to standardise the protocols via IETF. There's no generic UDP-like socket (ZMQ_ROUTER), you should use L4 protocols for that kind of functionality.

Question:

Reading through the ZeroMQ documentation, I got a little lost when I discovered these three socket combinations. They are:

  • DEALER to ROUTER
  • DEALER to DEALER
  • ROUTER to ROUTER

I understand that DEALER and ROUTER are replacements for the synchronous REQ/REP communication, so they become asynchronous and multiple nodes can connect. What I don't understand is how a DEALER can be either a replacement for REQ and REP in DEALER to DEALER (and also a router in ROUTER to ROUTER).

I was looking for a pattern that allows an arbitrary number of clients to submit jobs to an arbitrary number of workers processing (with load balancing) those and returning responses (and intermediate results) to the client (asynchronously, but sending multiple messages back). The client may also need to be able to terminate the work prematurely. I find the documentation a little light in this respect (I'm not an expert by any stretch and may have missed the relevant section).

I'm happy to work out the details myself, but every time I think I found a suitable pattern, I discover another one that may be equally suitable (for instance these 3 patterns are equally suitable in my opinion: http://zguide.zeromq.org/page:all#ROUTER-Broker-and-REQ-Workers, http://zguide.zeromq.org/page:all#ROUTER-Broker-and-DEALER-Workers, http://zguide.zeromq.org/page:all#A-Load-Balancing-Message-Broker).

Any advice on the structure (which socket for which component to communicate) is appreciated.

Update

This is what I came up with so far:

import multiprocessing
import zmq
import time

router_url_b = 'tcp://*:5560'
router_url = 'tcp://localhost:5560'

dealer_url_b = 'tcp://*:5561'
dealer_url = 'tcp://localhost:5561'


def broker():
    context = zmq.Context()
    router = context.socket(zmq.ROUTER)
    router.bind(router_url_b)

    dealer = context.socket(zmq.DEALER)
    dealer.bind(dealer_url_b)

    poll = zmq.Poller()
    poll.register(router, zmq.POLLIN)
    poll.register(dealer, zmq.POLLIN)

    while True:
        poll_result = dict(poll.poll())
        if router in poll_result:
            ident, msg = router.recv_multipart()
            print 'router: ident=%s, msg=%s' % (ident, msg)
            # print 'router received "%s" and ident %s' % (msg, ident)
            dealer.send_multipart([ident, msg])
            # dealer.send(msg)
        if dealer in poll_result:
            ident, msg = dealer.recv_multipart()
            print 'dealer: ident=%s, msg=%s' % (ident, msg)
            router.send_multipart([ident, msg])


def client(client_id):
    context = zmq.Context()
    req = context.socket(zmq.DEALER)
    # setting identity doesn't seem to make a difference
    req.setsockopt(zmq.IDENTITY, b"%s" % client_id)
    req.connect(router_url)

    req.send('work %d' % client_id)
    while True:
        msg = req.recv()
        print 'client %d received response: %s' % (client_id, msg)


def worker(worker_id):
    context = zmq.Context()
    # to allow asynchronous sending of responses.
    rep = context.socket(zmq.ROUTER)
    # not sure if this is required...
    # rep.setsockopt(zmq.IDENTITY, b"%s" % (10+worker_id))
    rep.connect(dealer_url)

    while True:
        msg = rep.recv_multipart()
        ident, msg = msg[:-1], msg[-1]
        print 'worker %d received: "%s", ident="%s"' % (worker_id, msg, ident)
        # do some work...
        time.sleep(10)
        rep.send_multipart(ident + ['result A from worker %d (%s)' % (worker_id, msg)])
        # do more work...
        time.sleep(10)
        rep.send_multipart(ident + ['result B from worker %d (%s)' % (worker_id, msg)])
    print 'finished worker', worker_id


def main():

    print 'creating workers'
    for i in xrange(2):
        p = multiprocessing.Process(target=worker, args=(i, ))
        p.daemon = True
        p.start()

    print 'creating clients'
    for i in xrange(5):
        p = multiprocessing.Process(target=client, args=(i, ))
        p.daemon = True
        p.start()

    broker()


if __name__ == '__main__':
    main()

It seems to be working quite well. Only thing that's missing is the communication from the client to the worker once the worker has started processing the work. I guess the best idea is to create a new control channel of some sort (pub/sub) to terminate the worker if required.

A few questions remain:

  • Has this model any obvious weaknesses?
  • What is the IDENTITY useful for? It doesn't seem to matter if I set those values (neither in client nor in worker).
  • The first messages the workers receive are: worker 1 received: "work 3", ident="['\x00\x80\x00A\xa7', '3']" worker 0 received: "work 4", ident="['\x00\x80\x00A\xa7', '4']" Why is the first ident item the same for both workers? The way I understand the router to work is to assign random identities which it keeps track of. How does this work (and it seems to work in a small-scale example)?

Answer:

In addition to my update, I found that the worker can connect to the backend of the server using a DEALER. The pattern and explanation can be found here.

The client uses a DEALER socket, the server receives requests as a ROUTER on the frontend (asyn + many clients), proxies them to the workers (backend) using a DEALER socket (asyn) and the workers listen to the server's backend on a DEALER socket (asyn, no routing necessary, although ROUTER also worked).

If the workers were strictly synchronous, we’d use REP, but since we want to send multiple replies we need an async socket. We do not want to route replies; they always go to the single server thread that sent us the request.

One further modification is the replacement of the implicit dispatching of router/dealer messages with zmq.proxy(router, dealer) (while True loop in broker()).

Update

Apparently, this pattern uses ZMQ's standard round-robin routing. A custom task assignment can be achieved by the ROUTER to ROUTER pattern. In this case a client starts with sending a request and workers start by sending a ready-message. The broker manages a list of ready workers and if none is available shuts down polling for new client messages (thus using ZMQ's internal message buffers).

Question:

I'm trying to build a notification messaging system. Im using the SimpleWsServer.php server example. I want to push a notification to the user's browser when a task has completed on the server. This needs to be done using PHP and i cant find a tutorial where this is shown. All the tutorials seems to be showing tavendo/AutobahnJS scripts to send and receive while the PHP server runs as a manager.

Is it possible to send a message using a php script to the subscribers?


Answer:

Astro,

This is actually pretty straight forward and can be accomplished a couple of different ways. We designed the Thruway Client to mimic the AutobahnJS client, so most of the simple examples will translate directly.

I'm assuming that you want to publish from a website (not a long running php script).

In your PHP website, you'll want to do something like this:

$connection = new \Thruway\Connection(
    [
        "realm"   => 'com.example.astro',
        "url"     => 'ws://demo.thruway.ws:9090', //You can use this demo server or replace it with your router's IP
    ]
);

$connection->on('open', function (\Thruway\ClientSession $session) use ($connection) {

    //publish an event
    $session->publish('com.example.hello', ['Hello, world from PHP!!!'], [], ["acknowledge" => true])->then(
        function () use ($connection) {
            $connection->close(); //You must close the connection or this will hang
            echo "Publish Acknowledged!\n";
        },
        function ($error) {
            // publish failed
            echo "Publish Error {$error}\n";
        }
    );
  });

 $connection->open();

And the javascript client (using AutobahnJS) will look like this:

var connection = new autobahn.Connection({
    url: 'ws://demo.thruway.ws:9090',  //You can use this demo server or replace it with your router's IP
    realm: 'com.example.astro'
});

connection.onopen = function (session) {

    //subscribe to a topic
    function onevent(args) {
        console.log("Someone published this to 'com.example.hello': ", args);    
    }

    session.subscribe('com.example.hello', onevent).then(
        function (subscription) {
            console.log("subscription info", subscription);
        },
        function (error) {
           console.log("subscription error", error);
        }
    );
};

connection.open();

I've also created a plunker for the javascript side and a runnable for the PHP side.

Question:

ZeroCopy messaging is something that can be implemented in zeromq, but is it possible to use it with the zmqpp c++ bindings? There is almost no documentation, and I was not able to find anything in the examples...


Answer:

I would switch to cppzmq.

It's a more active project and maintained by some of the core libzmq people.

It's header only and has support for zero copy.

Question:

Is it possible to implement a brokerless network with queues using ZeroMQ (with JeroMQ Java porting)?

In my network all peers are both publishers and receivers (SUB/PUB pattern), so that when a peer sends a message all other peers get the message.

The problem is messages are not reliable and can get lost (for example for connectivity issues) and not recovered anymore.

I'd like to implement a queue where peers can retrieve messages they have not received.

I'm looking at this guide (even though it's for Python) and it seems I should implement the XREP/XREQ pattern:

but it seems this is possible only implementing a queue server. Is it true?


Answer:

Q: Is this possible only implementing a queue server?A: No.

May be I did not get your point of view exactly, but having a few years spent inside ZeroMQ based distributed-systems, I can address a few misses in the concept.

First: Yes, Zen-of-Zero does provide ZERO-Warranty for a respective message delivery. This may seem surprising, but there are many reasons for working this way and no other. There is a warranty of consistency - i.e. a message is either delivered as-is or none at all. This means, if the message has made it through the socket, the receiving side may be sure, that the sender was dispatching this very content and no error-checking need be put in place, as the ZeroMQ has already spent all its effort to deliver a 1:1 bit-by-bit copy of the original.

Next: ZeroMQ is designed as a Broker-less asynchronous lightweight signalling / messaging tool. The word Broker-less means, there are zero-efforts spend for any sort of a tool-based persistence, so indeed there is no care about any Broker-side storing any (semi-)persistent replica(s) of the messages, be it those delivered or those not delivered due to whatever technical reason ( yet, those delivered are -- as expressed above -- guaranteed to be OK and an exact copy of the original ).

Implication: this means, there will be zero effect from designing a zmq.device( zmq.Queue, f, b ) as this will have all the properties reported above, so it will principally live under the same set of paradigms.


Solution?

If one needs to have both the delivered content-warranty and also the all-messages-delivered warranty, the former is included in the ZeroMQ tools since inception, the latter is to be added on top of the standard tools, as an extended supra-pattern, re-using the delivery-agnostic standard tools.

This way one can get what you have sketched above, yet not wasting a single CPU-clock in all other use-cases, where delivery-agnostic, just "best-effort" transports are okay.

Question:

I have trouble with establishing asynchronous point to point channel using ZeroMQ.

My approach to build point to point channel was that it generates as many ZMQ_PAIR sockets as possible up to the number of peers in the network. Because ZMQ_PAIR socket ensures an exclusive connection between two peers, it needs the same number of peers. My first attempt is realized as the following diagram that represents paring connections between two peers.

But the problem of the above approach is the fact that each pairing socket needs a distinct bind address. For example, if four peers are in the network, then each peer should have at least three ( TCP ) address to bind the rest of peers, which is very unrealistic and inefficient. ( I assume that peer has exactly one unique address among others. Ex. tcp://*:5555 )

It seems that there is no way other than using different patterns, which contain some set of message brokers, such as XREQ/XREP. ( I intentionally avoid broker based approach, because my application will heavily exchange message between peers, which it will often result in performance bottleneck at the broker processes. )

But I wonder that if there is anybody who uses ZMQ_PAIR socket to efficiently build point to point channel? Or is there a way to bypass to have distinct host IP addresses for multiple ZMQ_PAIR sockets to bind?


Answer:

Q: How to effectively establish ... well,

Given the above narrative, the story of "How to effectively ..." ( where a metric of what and how actually measures the desired effectivity may get some further clarification later ), turns into another question - "Can we re-factor the ZeroMQ Signalling / Messaging infrastructure, so as to work without using as many IP-addresses:port#-s as would the tcp://-transport-class based topology actually need?"

Upon an explicitly expressed limit of having not more than a just one IP:PORT# per host/node ( being thus the architecture's / desing's the very, if not the most expensive resource ) one will have to overcome a lot troubles on such a way forward.

It is fair to note, that any such attempt will come at an extra cost to be paid. There will not be any magic wand to "bypass" such a principal limit expressed above. So get ready to indeed pay the costs.

It reminds me one Project in TELCO, where a distributed-system was operated in a similar manner with a similar original motivation. Each node had an ssh/sshd service setup, where local-port forwarding enabled to expose a just one publicly accessible IP:PORT# access-point and all the rest was implemented "inside" a mesh of all the topological links going through ssh-tunnels not just because the encryption service, but right due to the comfort of having the ability to maintain all the local-port-forwarding towards specific remote-ports as a means of how to setup and operate such exclusive peer-to-peer links between all the service-nodes, yet having just a single public access IP:PORT# per node.

If no other approach will seem feasible ( PUB/SUB being evicted for either traffic actually flowing to each terminal node in cases of older ZeroMQ/API versions, where Topic-filtering gets processed but on the SUB-side, which both security and network Departments will not like to support, or for concentrated workloads and immense resources needs on PUB-side, in cases of newer ZeroMQ/API versions, where Topic-filter is being processed on the sender's side. Adressing, dynamic network peer (re-)discovery, maintenance, resources planning, fault resilience, ..., yes, not any easy shortcut seems to be anywhere near to just grab and (re-)use ) the above mentioned "stone-age" ssh/sshd-port-forwarding with ZeroMQ, running against such local-ports only, may save you.

Anyway - Good Luck on the hunt!

Question:

I am trying to send a message from Python code to C# via ZeroMQ. I am using the following data structure in Python:

message = msgpack.packb(
    (
        {"message_id": "1001", "type": "GET", "namespace": "DocumentManager"},
        "MdiActiveDocument",
        ["parameter1", "parameter2"]
    )
)
message = msgpack.packb(message)
alive_socket.send(message)

Trying to unpack it with C#, using this code:

        var message = new byte[500];
        int result = this.Client.Receive(message);
        var serializer = 
        MessagePackSerializer.Get<Tuple<Dictionary<string,string>, String, List<String>>>();
        var reply = serializer.UnpackSingleObject(message);

It results in this error:

Additional information: Unpacker is not in the array header.
The stream may not be array.

I have tried simplifying the data structure but I still cannot get it right. Perhaps my usage of MsgPack is flawed. Thanks in advance for any help.


Answer:

Update: the real cause was much, much dumber than this explanation which I deleted. I called packb() twice in the original code.

Question:

I found that people don't recommend sending large messages with ZeroMQ. But it is a real headache for me to split the data (it is somewhat twisted). Why this is not recommended is there some specific reason? Can it be overcome?


Answer:

Why this is not recommended?

Resources ...

Even the best Zero-Copy implementation has to have spare resources to store the payloads in several principally independent, separate locations:

|<fatMessageNo1>|
|...............|__________________________________________________________ RAM
|...............|<fatMessageNo1>|
|...............|...............|__________________Context().Queue[peerNo1] RAM
|...............|...............|<fatMessageNo1>|
|...............|...............|...............|________O/S.Buffers[L3/L2] RAM

Can it be overcome?

Sure, do not send Mastodon-sized-GB+ messages. May use any kind of an off-RAM representation thereof and send just a lightweight reference to allow a remote peer to access such an immense beast.


Many new questions added via comment : ( which is considered un-fair at StackOverflow )

I was concern more about something like transmission failure: what will zeromq do (will it try to retransmit automatically, will it be transparent for me etc). RAM is not so crucial - servers can have it more than enough and service that we write is not intended to have huge amount of clients at the same time. The data that I talk about is very interrelated (we have molecules/atoms info and bonds between them) so it is impossible to send a chunk of it and use it - we need it all)) – Paul 25 mins ago

You may be already aware that ZeroMQ is working under a Zen-of-Zero, where also a zero-warranty got its place.

So, a ZeroMQ dispatched message will either be delivered "through" error-free, or not delivered at all. This is a great pain-saver, as your code will receive only a fully-protected content atomically, so no tortured trash will ever reach your target post-processing. Higher level soft-protocol handshaking allows one to remain in control, enabling mitigations of non-delivered cases from higher levels of abstractions, so if your design apetite and deployment conditions permit, one can harness a brute force and send whatever-[TB]-BLOBs, at one's own risk of blocked both local and infrastructure resources, if others permit and don't mind ( ... but never on my advice :o) )

Error-recovery self-healing - from lost-connection(s) and similar real-life issues - is handled if configuration, resources and timeouts permit, so a lot of troubles with keeping L1/L2/L3-ISO-OSI layers issues are efficiently hidden from user-apps programmers.

Question:

Could you please advice an ZeroMQ socket(s) architecture for the following scenario:

1) there is server listening on port

2) there are several clients connecting server simultaneously

3) server accept all connections from clients and provide bi-directional queue for each client, means both party (client N or server) can send or consume messages, i.e. both party can be INITIATOR of the communication and other party should have a callback to process the message.

Should we create additional ZeroMQ socket on each accepted connection for pushing messages from server? Could you please advice of which ZeroMQ socket type to google for such architecture?


Answer:

Q : …create additional ZeroMQ socket on each accepted connection for pushing messages from server?

The best trivial composition-based design - both scaling-wise and security-wise

The native ZeroMQ primitives ( the smart primitive Scalable Formal Communications Pattern Archetypes ) are like a LEGO building-blocks for us - we compose their further use on the Messaging/Signalling Plane of the intended target use in our application-domain.

Q : Could you please advice of which ZeroMQ socket type to google for such architecture?

No, given no detailed requirements list was available for such an advice. A pair of PUSH/PULL-s need not suffice on their own, temporarily conducted ( episodic ) REQ/REP may help for the client-(re-)discovery phases, as might other co-existent, persistent or episodic Archetypes do for composing any additional System/Service-Planes.

Question:

I am working to put a distributed computing framework in place, and am investigating using 0mq as the underpinning communication layer. Jobs, which can take up to two hours to run, can be started on any worker in a cluster (assumption that all workers in a cluster have access to the same resources and are capable of running the same jobs). The manager is responsible for monitoring the system state and triggering the jobs. The worker machines are also multi-core, but I am ignoring this portion for now.

The problem I am trying to solve is distributing jobs messages to workers, with:

  • messages sit in queue until delivered
  • only one worker receives a message
  • a worker only receives a message IF it is capable of performing work (for example there isn't a job in process).

Ignoring the multi core, and assuming a cluster of 5 workers. If the manager puts 7 jobs into the queue, we should have five of them should be running, one on each worker, with two remaining in queue. When one job ends the worker should receive the next job in the queue.

I have run experiments and researched:

  • PUB/SUB: the first problem is that the message will be received by all workers
  • PUSH/PULL: well there is the first joiner problem, but more importantly - I cannot pause the queue while a job is in progress (it keeps taking messages)
  • REQ/REP: This is the fall back, but it requires me to code everything up myself.

The questions are:

  • Are there known guidelines for this type of system, that I should be following?
  • Is there a zmq socket type OR combination that will help me achieve my goal? I like the DEALER concept, but it still looks to round robin messages regardless of the state of the recipient.

Answer:

You need to devise a flow control protocol, in addition to sending work messages to the workers.

A protocol I like has the workers sending Ready messages to the manager. The manager puts the ready workers into a list, and replies to the Ready messages one at a time until there is no more work. This is load balancing which is a commonly used pattern, and indeed there's the Load Balancing example in chapter 3 of the zmq guide. I think ROUTER/DEALER sockets work best.

The beauty of load balancing is it's sheer simplicity. The manager just has to maintain a list of jobs, another list of ready workers, and reply to their Ready messages with the work. It doesn't even matter what kind of list it is, FIFO, LIFO, random.

The fact that the workers are running on multicore systems shouldn't be special. It's simply the case that workers should try to avoid starving each other of any resources, whether that's disk, memory, or CPU. Of course, that's another layer independent of the load balancing pattern.

Question:

I am trying to connect several hosts in a ring topology with ZeroMQ. But I after trying for a long time I hope to get some help now.

Basically I have a client C that sends message M to itself S through several hosts H1...Hn. It is a ring communication.

C -> H2 -> H1 -> S    # where C and S are on the same host.

IP addresses:
C, S:  192.168.100.165
H2:    192.168.100.102
H1:    192.168.100.101

H1 and H2 need to take the message do something on it and forward it to the next host.

I tried to follow the extended request-reply pattern example of the official ZeroMQ doc.

This is my client C:

main_context = zmq.Context()
client_socket = main_context.socket(zmq.REQ)
client_socket.connect("tcp://192.168.100.102:5556")
while 1:
    client_socket.send("hello!")
    ug_response = client_socket.recv()
    time.sleep(3)

Here comes my server S (that is on the same host. Even same program, just another thread):

ug_server_context = zmq.Context()
server_socket = ug_server_context.socket(zmq.REP)
server_socket.bind("tcp://*:5558")
while 1:
    updated_ug_message = server_socket.recv()
    server_socket.send("ug received.")

And this are my "brokers" H1 and H2 that use ROUTER and DEALER:

context = zmq.Context()
# Prepare broker sockets
frontend = context.socket(zmq.ROUTER)
backend = context.socket(zmq.DEALER)
frontend.bind("tcp://*:5556")    # for H1: 5557
backend.bind("tcp://*:5557")     # for H1: 5558

# Initialize broker poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

# Switch messages between sockets
while 1:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        backend.send_multipart(message)

    if socks.get(backend) == zmq.POLLIN:
        message = backend.recv_multipart()
        frontend.send_multipart(message)

The thing is, it doesn't work and my question are:

1. How does H2 know that it has to send message to H1?

2. Am I going towards the right direction?


Answer:

OK, in the meantime I figured it out on my own. I had to change:

backend.bind("tcp://*:5557")

to:

backend.connect("tcp://192.168.100.101:5557")

Now my communication ring works very well.

Question:

I'm trying to create an rpc program to communicate hosts located on different networks and chose Router-Dealer configuration of NetMQ provided here: http://netmq.readthedocs.io/en/latest/router-dealer/#router-dealer

But the problem is that router always selects a random dealer when routing a message to backend. Code which I used :

using (var frontend = new RouterSocket(string.Format("@tcp://{0}:{1}", "127.0.0.1", "5556")))//"@tcp://10.0.2.218:5559"
                using (var backend = new DealerSocket(string.Format("@tcp://{0}:{1}", "127.0.0.1", "5557")))//"@tcp://10.0.2.218:5560"
                {
                    // Handler for messages coming in to the frontend
                    frontend.ReceiveReady += (s, e) =>
                    {
                        Console.WriteLine("message arrived on frontEnd");                            
                        NetMQMessage msg = e.Socket.ReceiveMultipartMessage();
                        string clientAddress = msg[0].ConvertToString();                            
                        Console.WriteLine("Sending to :" + clientAddress);
                        //TODO: Make routing here
                        backend.SendMultipartMessage(msg); // Relay this message to the backend                        };

                    // Handler for messages coming in to the backend
                    backend.ReceiveReady += (s, e) =>
                    {
                        Console.WriteLine("message arrived on backend");
                        var msg = e.Socket.ReceiveMultipartMessage();

                        frontend.SendMultipartMessage(msg); // Relay this message to the frontend
                    };

                    using (var poller = new NetMQPoller { backend, frontend })
                    {
                        // Listen out for events on both sockets and raise events when messages come in
                        poller.Run();
                    }
                }

Code for Client:

using (var client = new RequestSocket(">tcp://" + "127.0.0.1" + ":5556"))
        {
            var messageBytes = UTF8Encoding.UTF8.GetBytes("Hello");
            var messageToServer = new NetMQMessage();
            //messageToServer.AppendEmptyFrame();
            messageToServer.Append("Server2");
            messageToServer.Append(messageBytes);
            WriteToConsoleVoid("======================================");
            WriteToConsoleVoid(" OUTGOING MESSAGE TO SERVER ");
            WriteToConsoleVoid("======================================");
            //PrintFrames("Client Sending", messageToServer);
            client.SendMultipartMessage(messageToServer);

            NetMQMessage serverMessage = client.ReceiveMultipartMessage();
            WriteToConsoleVoid("======================================");
            WriteToConsoleVoid(" INCOMING MESSAGE FROM SERVER");
            WriteToConsoleVoid("======================================");
            //PrintFrames("Server receiving", clientMessage);
            byte[] rpcByteArray = null;
            if (serverMessage.FrameCount == 3)
            {
                var clientAddress = serverMessage[0];
                rpcByteArray = serverMessage[2].ToByteArray();
            }

            WriteToConsoleVoid("======================================");
            Console.ReadLine();
        }

Code for Dealer:

using (var server = new ResponseSocket())
        {
            server.Options.Identity = UTF8Encoding.UTF8.GetBytes(confItem.ResponseServerID);
            Console.WriteLine("Server ID:" + confItem.ResponseServerID);
            server.Connect(string.Format("tcp://{0}:{1}", "127.0.0.1", "5557"));
            using (var poller = new NetMQPoller { server })
            {
                server.ReceiveReady += (s, a) =>
                    {
                        byte[] response = null;
                        NetMQMessage serverMessage = null;
                        try
                        {
                            serverMessage = a.Socket.ReceiveMultipartMessage();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine("Exception on ReceiveMultipartMessage : " + ex.ToString());
                            //continue;
                        }
                        byte[] eaBody = null;
                        string clientAddress = "";
                        if (serverMessage.FrameCount == 2)
                        {
                            clientAddress = serverMessage[0].ConvertToString();
                            Console.WriteLine("ClientAddress:" + clientAddress);
                            eaBody = serverMessage[1].ToByteArray();
                            Console.WriteLine("Received message from remote computer: {0} bytes , CurrentID : {1}", eaBody.Length, confItem.ResponseServerID);
                        }
                        else
                        {
                            Console.WriteLine("Received message from remote computer: CurrentID : {0}", confItem.ResponseServerID);
                        }
                    };
                poller.Run();
            }
        }

Is it possible to choose a specific backend on frontend.ReceiveReady? Thanks!


Answer:

Your backend should be router as well. You need the worker to register or you need to know all the available workers and their identity. When send on the backend push the worker identity at the beginning of the server.

Take a look at the Majordomo example in the zeromq guide:

http://zguide.zeromq.org/page:all#toc72 http://zguide.zeromq.org/page:all#toc98