Hot questions for Using ZeroMQ in proxy

Question:

I implemented the Last Value Caching (LVC) example of ZMQ (http://zguide.zeromq.org/php:chapter5#Last-Value-Caching), but can't get a 2nd subscriber to register at the backend.

The first time a subscriber comes on board, the event[0] == b'\x01' condition is met and the cached value is sent, but the second subscriber (same topic) doesn't even register (if backend in events: is never true). Everything else works fine. Data gets passed from the publisher to the subscribers (all).

What could be the reason for this? Is the way the backend is connected correct? Is this pattern only supposed to work with the first subscriber?

Update

When I subscribe the 2nd subscriber to another topic, I get the right behaviour (i.e. \x01 when subscribing). This really seems to work for the first subscriber onlt . Is is a bug in ZeroMQ?

Update 2

Here's a minimal working example that shows that the LVC pattern is not working (at least not the way it's implemented here).

# subscriber.py
import zmq

def main():
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)
    sub.connect("tcp://127.0.0.1:5558")

    # Subscribe to every single topic from publisher
    print 'subscribing (sub side)'
    sub.setsockopt(zmq.SUBSCRIBE, b"my-topic")

    poller = zmq.Poller()
    poller.register(sub, zmq.POLLIN)
    while True:
        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if sub in events:
            msg = sub.recv_multipart()
            topic, current = msg
            print 'received %s on topic %s' % (current, topic)

if __name__ == '__main__':
    main() 

And here's the broker (as in the example, but with a bit more verbosity and an integrated publisher).

# broker.py
# from http://zguide.zeromq.org/py:lvcache
import zmq
import threading
import time


class Publisher(threading.Thread):
    def __init__(self):
        super(Publisher, self).__init__()

    def run(self):
        time.sleep(10)
        ctx = zmq.Context.instance()
        pub = ctx.socket(zmq.PUB)
        pub.connect("tcp://127.0.0.1:5557")

        cnt = 0
        while True:
            msg = 'hello %d' % cnt
            print 'publisher is publishing %s' % msg
            pub.send_multipart(['my-topic', msg])
            cnt += 1
            time.sleep(5)


def main():
    ctx = zmq.Context.instance()
    frontend = ctx.socket(zmq.SUB)
    frontend.bind("tcp://*:5557")
    backend = ctx.socket(zmq.XPUB)
    backend.bind("tcp://*:5558")

    # Subscribe to every single topic from publisher
    frontend.setsockopt(zmq.SUBSCRIBE, b"")

    # Store last instance of each topic in a cache
    cache = {}

    # We route topic updates from frontend to backend, and
    # we handle subscriptions by sending whatever we cached,
    # if anything:
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)


    # launch a publisher
    p = Publisher()
    p.daemon = True
    p.start()

    while True:

        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if frontend in events:
            msg = frontend.recv_multipart()
            topic, current = msg
            cache[topic] = current
            backend.send_multipart(msg)

        ### this is where it fails for the 2nd subscriber. 
        ### There's never even an event from the backend 
        ### in events when the 2nd subscriber is subscribing.

        # When we get a new subscription we pull data from the cache:
        if backend in events:
            print 'message from subscriber'
            event = backend.recv()
            # Event is one byte 0=unsub or 1=sub, followed by topic
            if event[0] == b'\x01':
                topic = event[1:]
                print ' => subscribe to %s' % topic
                if topic in cache:
                    print ("Sending cached topic %s" % topic)
                    backend.send_multipart([ topic, cache[topic] ])
            elif event[0] == b'\x00':
                topic = event[1:]
                print ' => unsubscribe from %s' % topic

if __name__ == '__main__':
    main()

Running this code (1 x broker.py, 2 x subscriber.py) shows that the first subscriber registers at the broker as expected (\x01 and cache lookup), but the 2nd subscriber does not get registered the same way. Interestingly, the 2nd subscriber is hooked up to the pub/sub channel, as after a while (10 sec) both subscribers receive data from the publisher.

This is very strange. Perhaps some of my libraries are outdated. Here's what I got:

Python 2.7.9 (v2.7.9:648dcafa7e5f, Dec 10 2014, 10:10:46) 
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import zmq
>>> zmq.__version__
'14.1.1'

$ brew info zeromq
zeromq: stable 4.0.5 (bottled), HEAD
High-performance, asynchronous messaging library
http://www.zeromq.org/
/usr/local/Cellar/zeromq/4.0.5_2 (64 files, 2.8M) *
  Poured from bottle
From: https://github.com/Homebrew/homebrew/blob/master/Library/Formula/zeromq.rb
==> Dependencies
Build: pkg-config ✔
Optional: libpgm ✘, libsodium ✘

Update 3

This behaviour can also be observed in zeromq 4.1.2 and pyzmq-14.7.0 (with or without libpgm and libsodium installed).

Update 4

Another observation suggests that the first subscriber is somehow handled differently: The first subscriber is the only one unsubscribing in the expected way from the XPUB socket (backend) by preceding its subscription topic with \x00. The other subscribers (I tried more than 2) stayed mute on the backend channel (although receiving messages).

Update 5

I hope I'm not going down a rabbit hole, but I've looked into the czmq bindings and ran my Python example in C. The results are the same, so I guess it's not a problem with the bindings, but with libzmq.

I also verified that the 2nd subscriber is sending a subscribe message and indeed I can see this on the wire:

First subscribe:

0000  02 00 00 00 45 00 00 3f  98 be 40 00 40 06 00 00   ....E..? ..@.@...
0010  7f 00 00 01 7f 00 00 01  fa e5 15 b6 34 f0 51 c3   ........ ....4.Q.
0020  05 e4 8b 77 80 18 31 d4  fe 33 00 00 01 01 08 0a   ...w..1. .3......
0030  2a aa d1 d2 2a aa cd e9  00 09 01 6d 79 2d 74 6f   *...*... ...my-to
0040  70 69 63                                           pic              

2nd subscribe message with difference (to above) marked and explained. The same data is sent in the subscribe frame.

                               identification
                               v
0000  02 00 00 00 45 00 00 3f  ed be 40 00 40 06 00 00   ....E..? ..@.@...
                             src port      sequence number
                                  v        v  v  v  v
0010  7f 00 00 01 7f 00 00 01  fa e6 15 b6 17 da 02 e7   ........ ........

Acknowledgement number   window scaling factor
      v  v  v  v           v
0020  71 4b 33 e6 80 18 31 d5  fe 33 00 00 01 01 08 0a   qK3...1. .3......

timestamp value  timestamp echo reply
            v           v  v   |<-------- data -------
0030  2a aa f8 2c 2a aa f4 45  00 09 01 6d 79 2d 74 6f   *..,*..E ...my-to

      ------>|
0040  70 69 63                                           pic              

Answer:

I found the solution for this problem, and even though I read the docs front to back and back to front, I had not seen it. The key is XPUB_VERBOSE. Add this line to after the backend initialisation and everything works fine

backend.setsockopt(zmq.XPUB_VERBOSE, True)

Here's an extract from the official documentation:

ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets Sets the XPUB socket behavior on new subscriptions and unsubscriptions. A value of 0 is the default and passes only new subscription messages to upstream. A value of 1 passes all subscription messages upstream.

Option value type int Option value unit 0, 1 Default value 0 Applicable socket types ZMQ_XPUB

Pieter Hintjens has some more information on this in his blog. This is the relevant section:

A few months ago we added a neat little option (ZMQ_XPUB_VERBOSE) to XPUB sockets that disables its filtering of duplicate subscriptions. This now works for any number of subscribers. We use this as follows:

void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_set_xpub_verbose (publisher, 1);
zsocket_bind (publisher, "tcp://*:6001");

The LVC pattern description should be updated to reflect this setting, as this pattern won't work otherwise.

Question:

In ZMQ Proxy, we have 2 types of sockets, DEALER and ROUTER. Also, I've tried to use the capture socket, but it didn't work based on what exactly I looked for.

I'm looking for a way to log what message my proxy server receives.


Answer:

Q : a way to log what message my proxy server receives.

The simplest way is to make use of an API v4+ directly supported logging via a ManInTheMiddle-"capture" socket:

// [ROUTER]--------------------------------------+++++++
//                                               |||||||
// [DEALER]---------------*vvvvvvvv             *vvvvvvv
int zmq_proxy (const void *frontend, const void *backend, const void *capture);
// [?]---------------------------------------------------------------*^^^^^^^

Where the capture ought be either of { ZMQ_PUB | ZMQ_DEALER | ZMQ_PUSH | ZMQ_PAIR }

If the capture socket is not NULL, the proxy shall send all messages, received on both frontend and backend, to the capture socket.

If this ZeroMQ API-granted is not meeting your expectation, feel free to express your expectations in as sufficiently detailed manner as needed ( and implement either an "external" capture-socket payload { message-content | socket_monitor() }-based filtering or one may design a brand new, user-defined logging-proxy, where your expressed features will get implemented with a use of your custom use-case specific requirements, implemented in your application-specific code, resorting to re-use but the clean and plain ZeroMQ API for all the DEALER-inbound/outbound-ROUTER message-passing and log-filtering/processing logic. )

There is no other way I can imagine to take place and solve the task.

Question:

I currently learning how to use the lib ZeroMQ that a friend advise me to use for a personnal project.

After reading the documentation and planning how to use the lib for my project, I began testing the project with the code given by the documentation. The test I used was this one . Unfortunatly it doesn't work. I did some minor modification to test it. (I give you the exact code I have on my test, it's a lot I am sorry but without everything I think it doesn't make sense and it's impossible to help me :/ ).

I almost changed nothing from the test given by the documentation, just added some output to test and I also deleted the poll in the client (I thought the probleme came from here because it was blocking the infinite loop even thought there was a timeout).

    #include <vector>
    #include <thread>
    #include <memory>
    #include <functional>


    #include <zmq.h>
    #include <zmq.hpp>
    #include <zhelper.hpp>

    //  This is our client task class.
    //  It connects to the server, and then sends a request once per second
    //  It collects responses as they arrive, and it prints them out. We will
    //  run several client tasks in parallel, each with a different random ID.
    //  Attention! -- this random work well only on linux.

    class client_task {
    public:
        client_task()
            : ctx_(1),
              client_socket_(ctx_, ZMQ_DEALER)
        {}

        void start() {
            // generate random identity
            char identity[10] = {};
            sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000));
            printf("-> %s\n", identity);
            client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
            client_socket_.connect("tcp://localhost:5570");

            zmq_pollitem_t items;
            items.socket = &client_socket_;
            items.fd = 0;
            items.events = ZMQ_POLLIN;
            items.revents = 0;

            int request_nbr = 0;
            try {
                while (true) {

                    for (int i = 0; i < 100; ++i) {

                        // 10 milliseconds
                        sleep(1);
                        std::cout << "ici" << std::endl;
                        if (items.revents & ZMQ_POLLIN) {
                            printf("\n%s ", identity);
                            s_dump(client_socket_);
                        }

                        char request_string[16] = {};
                        sprintf(request_string, "request #%d", ++request_nbr);
                        client_socket_.send(request_string, strlen(request_string));

                    }
                }

            }
            catch (std::exception &e)
            {}
        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t client_socket_;
    };

    //  Each worker task works on one request at a time and sends a random number
    //  of replies back, with random delays between replies:

    class server_worker {
    public:
        server_worker(zmq::context_t &ctx, int sock_type)
            : ctx_(ctx),
              worker_(ctx_, sock_type)
        {}

        void work() {
                worker_.connect("inproc://backend");

            try {
                while (true) {
                    zmq::message_t identity;
                    zmq::message_t msg;
                    zmq::message_t copied_id;
                    zmq::message_t copied_msg;
                    worker_.recv(&identity);
                    worker_.recv(&msg);
                    std::cout << "I never arrive here" << std::endl;

                    int replies = within(5);
                    for (int reply = 0; reply < replies; ++reply) {
                        std::cout << "LA" << std::endl;
                        s_sleep(within(1000) + 1);
                        copied_id.copy(&identity);
                        copied_msg.copy(&msg);
                        worker_.send(copied_id, ZMQ_SNDMORE);
                        worker_.send(copied_msg);
                    }
                }
            }
            catch (std::exception &e) {}
        }

    private:
        zmq::context_t &ctx_;
        zmq::socket_t worker_;
    };

    //  This is our server task.
    //  It uses the multithreaded server model to deal requests out to a pool
    //  of workers and route replies back to clients. One worker can handle
    //  one request at a time but one client can talk to multiple workers at
    //  once.

    class server_task {
    public:
        server_task()
            : ctx_(1),
              frontend_(ctx_, ZMQ_ROUTER),
              backend_(ctx_, ZMQ_DEALER)
        {}

        void run() {
            frontend_.bind("tcp://*:5570");
            backend_.bind("inproc://backend");

            server_worker * worker = new server_worker(ctx_, ZMQ_DEALER);
            std::thread worker_thread(std::bind(&server_worker::work, worker));
            worker_thread.detach();

            try {
                zmq::proxy(&frontend_, &backend_, NULL);
            }
            catch (std::exception &e) {}

        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t frontend_;
        zmq::socket_t backend_;
    };

    //  The main thread simply starts several clients and a server, and then
    //  waits for the server to finish.

    int main (void)
    {
        client_task ct1;
        client_task ct2;
        client_task ct3;
        server_task st;

        std::thread t1(std::bind(&client_task::start, &ct1));
        std::thread t2(std::bind(&client_task::start, &ct2));
        std::thread t3(std::bind(&client_task::start, &ct3));
        std::thread t4(std::bind(&server_task::run, &st));

        t1.detach();
        t2.detach();
        t3.detach();
        t4.detach();
        std::cout << "ok" << std::endl;
        getchar();
        std::cout << "ok" << std::endl;
        return 0;
    }

The output I get from this code is the following :

-> CC66-C879
-> 3292-E961
-> C4AA-55D1
ok
ici
ici
ici
... (infinite ici)

I really don't understand why it doesn't work. The poll in the client send an exception Socket operation on non-socket. The major probleme for me is that it's a test coming from the official documentation and I can't make it work. What is the problem about my utilisation of the socket ?

Thanks for your help


Answer:

I found out the problem.

There is a problem in the official documentation (some obvious mistake like the initialisation of the zmq_pollitem_t array) and another one that made my test not working.

For the zmq::poll or zmq::proxy, you need to cast the socket structure in void* and you mustn't use a pointer on the socket. ZMQ poll not working

After those modification it worked. I did another post to explain why here

Here is the corrected code without my additionnal testing output :

        //  Asynchronous client-to-server (DEALER to ROUTER)
    //
    //  While this example runs in a single process, that is to make
    //  it easier to start and stop the example. Each task has its own
    //  context and conceptually acts as a separate process.

    #include <vector>
    #include <thread>
    #include <memory>
    #include <functional>


    #include <zmq.h>
    #include <zmq.hpp>
    #include <zhelper.hpp>

    //  This is our client task class.
    //  It connects to the server, and then sends a request once per second
    //  It collects responses as they arrive, and it prints them out. We will
    //  run several client tasks in parallel, each with a different random ID.
    //  Attention! -- this random work well only on linux.

    class client_task {
    public:
        client_task()
            : ctx_(1),
              client_socket_(ctx_, ZMQ_DEALER)
        {}

        void start() {
            // generate random identity
            char identity[10] = {};
            sprintf(identity, "%04X-%04X", within(0x10000), within(0x10000));
            printf("-> %s\n", identity);
            client_socket_.setsockopt(ZMQ_IDENTITY, identity, strlen(identity));
            client_socket_.connect("tcp://localhost:5555");

            zmq_pollitem_t items[1];                
            items[0].socket = static_cast<void *> (client_socket_);
            items[0].fd = 0;
            items[0].events = ZMQ_POLLIN;
            items[0].revents = 0;
            int request_nbr = 0;
            try {
                while (true) {
                    for (int i = 0 ; i < 100; ++i) {

                    zmq::poll(items, 1, 10);
                    if (items[0].revents & ZMQ_POLLIN) {
                            printf("\n%s =>", identity);
                            s_dump(client_socket_);
                        }
                    }

                    char request_string[16] = {};
                    sprintf(request_string, "request #%d", ++request_nbr);
                    client_socket_.send(request_string, strlen(request_string));

                }

            }
            catch (std::exception &e)
            {
                std::cout << "exception :  "  << zmq_errno() << " "<< e.what() << std::endl;
                if (zmq_errno() == EINTR)
                    std::cout << "lol"<< std::endl;
            }
        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t client_socket_;
    };

    //  Each worker task works on one request at a time and sends a random number
    //  of replies back, with random delays between replies:

    class server_worker {
    public:
        server_worker(zmq::context_t &ctx, int sock_type)
            : ctx_(ctx),
              worker_(ctx_, sock_type)
        {}

        void work() {
                worker_.connect("inproc://backend");

            try {
                while (true) {
                    zmq::message_t identity;
                    zmq::message_t msg;
                    zmq::message_t copied_id;
                    zmq::message_t copied_msg;
                    worker_.recv(&identity);
                    worker_.recv(&msg);

                    int replies = within(5);
                    for (int reply = 0; reply < replies; ++reply) {
                        s_sleep(within(1000) + 1);
                        copied_id.copy(&identity);
                        copied_msg.copy(&msg);
                        worker_.send(copied_id, ZMQ_SNDMORE);
                        worker_.send(copied_msg);
                    }
                }
            }
            catch (std::exception &e)
            {
                std::cout << "Error in worker : " << e.what() << std::endl;
            }
        }

    private:
        zmq::context_t &ctx_;
        zmq::socket_t worker_;
    };

    //  This is our server task.
    //  It uses the multithreaded server model to deal requests out to a pool
    //  of workers and route replies back to clients. One worker can handle
    //  one request at a time but one client can talk to multiple workers at
    //  once.

    class server_task {
    public:
        server_task()
            : ctx_(1),
              frontend_(ctx_, ZMQ_ROUTER),
              backend_(ctx_, ZMQ_DEALER)
        {}

        void run() {
            frontend_.bind("tcp://*:5555");
            backend_.bind("inproc://backend");

            server_worker * worker = new server_worker(ctx_, ZMQ_DEALER);
            std::thread worker_thread(std::bind(&server_worker::work, worker));
            worker_thread.detach();

            try {
                zmq::proxy(static_cast<void *>(frontend_), static_cast<void *> (backend_), NULL);
            }
            catch (std::exception &e)
            {
                std::cout << "Error in Server : " << e.what() << std::endl;
            }

        }

    private:
        zmq::context_t ctx_;
        zmq::socket_t frontend_;
        zmq::socket_t backend_;
    };

    //  The main thread simply starts several clients and a server, and then
    //  waits for the server to finish.

    int main (void)
    {
        client_task ct1;
        client_task ct2;
        client_task ct3;
        server_task st;

        std::thread t4(std::bind(&server_task::run, &st));
        t4.detach();
        std::thread t1(std::bind(&client_task::start, &ct1));
        std::thread t2(std::bind(&client_task::start, &ct2));
        std::thread t3(std::bind(&client_task::start, &ct3));

        t1.detach();
        t2.detach();
        t3.detach();

        getchar();
        return 0;
    }

Question:

I'm having a weird issue with the proxy in pyzmq. Here's the code of that proxy:

import zmq
context = zmq.Context.instance()

frontend_socket = context.socket(zmq.XSUB)
frontend_socket.bind("tcp://0.0.0.0:%s" % sub_port)

backend_socket = context.socket(zmq.XPUB)
backend_socket.bind("tcp://0.0.0.0:%s" % pub_port)

zmq.proxy(frontend_socket, backend_socket)

I'm using that proxy to send messages between ~50 processes that run on 6 different machines. The total amount of topics is around 1,000, but since multiple processes can listen on the same topics, the total amount of subscriptions is around 10,000.

In normal times this works very well, messages go through the proxy correctly as long as a process publishes it and at least one other processes is subscribed to the topic. It works whether the publisher or subscriber was started first.

But at some point in time, when we start a new process (let's call it X), it starts behaving strangely. Everything that was already connected keeps working, but the new processes that we connect can only get messages to go through if the publisher is connected before the subscriber. X can be any one of the processes that normally work, and it can be from any machine, and the result is the same. When we get in this state, killing X makes everything work again, and starting it again makes it fail. If we stop other processes and then start X, it works well (so it's not related with X's code in particular).

I'm not sure if we could be reaching some limit of ZMQ? I've read examples of people that seem to have way more processes, subscriptions, etc. than us. It could be some option that we should set on the proxy, so far here are the ones we've tried without success:

  • Changing RCVHWM on frontend_socket
  • Changing SNDHWM on backend_socket
  • Setting XPUB_VERBOSE on backend_socket
  • Setting XPUB_VERBOSER on backend_socket

Here is sample code of how we publish messages to the proxy:

topic = "test"
message = {"test": "test"}

context = zmq.Context.instance()
socket = context.socket(zmq.PUB)
socket.connect("tcp://1.2.3.4:1234")
while True:
    time.sleep(1)
    socket.send_multipart([topic.encode(), json.dumps(message).encode()])

Here is sample code of how we subscribe to messages from the proxy:

topic = "test"
context = zmq.Context.instance()
socket = context.socket(zmq.SUB)
socket.connect("tcp://1.2.3.4:5678")
socket.subscribe(topic)

while True:
    multi_part = socket.recv_multipart()
    [topic, message] = multi_part
    print(topic.decode(), message.decode())

Has anyone ever seen a similar issue? Is there something we can do to avoid the proxy getting in this state?

Thanks!


Answer:

Make all the publishers (proxy and publish process) XPUB ( + sockopt verbose/verboser) then read from the publisher sockets on a poll loop. The first byte of the subscription message will tell you if the message is sub/unsub followed by the subject/topic. If you log all of the this information with timestamps it should tell you which component is at fault (it could be any of the three) and help with a fix.

The format of the subscription messages that arrive on the publisher (XPUB) will be

  • Subscription [0x01][topic]
  • Unsubscription [0x00][topic]
Code needed

I usually work on C++ but this is the general idea in python

proxy

You need to create a capture socket (this acts like a network tap). You connect a ZMQ_PAIR socket to the proxy (capture) over inproc and then read the contents at the other end of the socket. As you are using XPUB/XSUB you will see the subscription messages.

zmq.proxy(frontend, backend, capture)

read the docs/examples for the python proxy.

publisher

In this case you need to read from the publishing socket in the same thread as you are sending on it. That's the reason I said a poll loop might be best.

This code is not tested at all.

topic = "test"
message = {"test": "test"}

context = zmq.Context.instance()
socket = context.socket(zmq.XPUB)
socket.connect("tcp://1.2.3.4:1234")

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
timeout = 1000  #ms

while True:
  socks = dict(poller.poll(timeout))
  if not socks : # 1
    socket.send_multipart([topic.encode(), json.dumps(message).encode()])
  if socket in socks:
    sub_msg = socket.recv()  
    # print out the message here.

Question:

I'm trying to implement Publish Subscribe using a Proxy with ZeroMQ and PHP as it is described in the guide in Figure 13. The setup is the same as described here: how to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(jzmq) 3.xx

subscriber.php

<?php
$context = new ZMQContext();
$sub = new ZMQSocket($context, ZMQ::SOCKET_SUB);
$sub->connect("tcp://127.0.0.1:5000");
$sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, 'Hello');
$msg = $sub->recv();
echo "got $msg";

publisher.php

<?php

$context = new ZMQContext();
$pub = new ZMQSocket($context, ZMQ::SOCKET_PUB);
$pub->connect("tcp://127.0.0.1:6000");

while (1) {
    echo "publishing";
    $pub->send("Hello World");
    sleep(1);
}

proxy.php

<?php
$context = new ZMQContext();
$frontend = new ZMQSocket($context, ZMQ::SOCKET_XSUB);
$frontend->bind("tcp://127.0.0.1:6000");
$backend = new ZMQSocket($context, ZMQ::SOCKET_XPUB);
$backend->bind("tcp://127.0.0.1:5000");
$device = new ZMQDevice($frontend, $backend);
$device->run();

If I start all three PHP scripts (first proxy, then publisher, then subscriber) no messages arrive at that subscriber.

In order to see if any messages arrive at the proxy at all, I tried to receive the messages manually on the proxy:

while (true) {
    if ($frontend->recv(ZMQ::MODE_DONTWAIT)) {
        echo "received message from xpub";
    }
    if ($frontend->recv(ZMQ::MODE_DONTWAIT)) {
        echo "received message from xsub";  
    }
}

There are serveral related questions on Stack Overflow:

  • How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)? - I am using the proxy as described in the answer, but I still don't receive any messages. Therefore this question does not apply
  • how to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(jzmq) 3.xx The person who answered said the problem was that messages from the publisher were not forwarded to the subscriber. However, I don't receive any messages from the publisher at all.

What am I missing?


Answer:

[PROXY] needs to have the topic-filter set too:

$frontend->send( chr(1) + "" ); /* XSUBSCRIBE to { ANY == "" } topic incoming */

( For ZeroMQ API-Ref.: >>> ZeroMQ API documentation )

Question:

I'm trying to understand how the zmq::proxy works, but I'm encountering problems: I'd like to have messages routed to the right worker, but seems like the identity and the evelopes are ignored: in the example I would like to route messages from client1 to worker2, and messages from client2 to worker1, but seems like the messages are served on a "first available worker" based rule. Am I doing something wrong, or did I misunderstood how the identity works?

#include <atomic>
#include <cassert>
#include <chrono>
#include <iostream>
#include <thread>
#include <mutex>

#include <zmq.hpp>
#include <zmq_addon.hpp>

using namespace zmq;
std::atomic_bool running;
context_t context(4);
std::mutex mtx;

void client_func(std::string name, std::string target, std::string message)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));

    socket_t request_socket(context, socket_type::req);
    request_socket.connect("inproc://router");
    request_socket.setsockopt( ZMQ_IDENTITY, name.c_str(), name.size());

    while(running)
    {   
        multipart_t msg;
        msg.addstr(target);
        msg.addstr("");
        msg.addstr(message);

        std::cout << name << "sent a message: " << message << std::endl;
        msg.send(request_socket);
        multipart_t reply;
        if(reply.recv(request_socket))
        {
            std::unique_lock<std::mutex>(mtx);
            std::cout << name << " received a reply!" << std::endl;

            for(size_t i = 0 ; i < reply.size() ; i++)
            {
                std::string theData(static_cast<char*>(reply[i].data()),reply[i].size());
                std::cout << "Part " << i << ": " << theData << std::endl;
            }

        }

        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    request_socket.close();
}


void worker_func(std::string name, std::string answer)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));

    socket_t response_socket(context, socket_type::rep);
    response_socket.connect("inproc://dealer");
    response_socket.setsockopt( ZMQ_IDENTITY, "resp", 4);

    while(running)
    {
        multipart_t request;

        if(request.recv(response_socket))
        {
            std::unique_lock<std::mutex>(mtx);

            std::cout << name << " received a request:" << std::endl;
            for(size_t i = 0 ; i < request.size() ; i++)
            {
                std::string theData(static_cast<char*>(request[i].data()),request[i].size());
                std::cout << "Part " << i << ": " << theData << std::endl;
            }

            std::string questioner(static_cast<char*>(request[0].data()),request[0].size());

            multipart_t msg;
            msg.addstr(questioner);
            msg.addstr("");
            msg.addstr(answer);

            msg.send(response_socket);
        }
    }

    response_socket.close();
}


int main(int argc, char* argv[])
{
    running = true;

    zmq::socket_t dealer(context, zmq::socket_type::dealer);
    zmq::socket_t router(context, zmq::socket_type::router);
    dealer.bind("inproc://dealer");
    router.bind("inproc://router");

    std::thread client1(client_func, "Client1", "Worker2", "Ciao");
    std::thread client2(client_func, "Client2", "Worker1", "Hello");
    std::thread worker1(worker_func, "Worker1","World");
    std::thread worker2(worker_func, "Worker2","Mondo");

    zmq::proxy(dealer,router);

    dealer.close();
    router.close();

    if(client1.joinable())
        client1.join();

    if(client2.joinable())
        client2.join();

    if(worker1.joinable())
        worker1.join();

    if(worker2.joinable())
        worker2.join();

    return 0;
}

Answer:

From the docs:

When the frontend is a ZMQ_ROUTER socket, and the backend is a ZMQ_DEALER socket, the proxy shall act as a shared queue that collects requests from a set of clients, and distributes these fairly among a set of services. Requests shall be fair-queued from frontend connections and distributed evenly across backend connections. Replies shall automatically return to the client that made the original request.

The proxy handles multiple clients and and uses multiple workers to process the requests. The identity is used to send the response to the right client. You cannot use the identify to "select" a specific worker.

Question:

I'm going to implement the following diagram: ZMQ-proxy

Just to let all "data generators" publish data to all "data receivers". Data receivers should still have ability to set subscritions.

// Proxy
package main

import zmq "github.com/pebbe/zmq4"

func main() {
    publisher, _ := zmq.NewSocket(zmq.XPUB)
    defer publisher.Close()
    publisher.Bind("tcp://*:8080")

    subscriber, _ := zmq.NewSocket(zmq.XSUB)
    defer subscriber.Close()
    subscriber.Bind("tcp://*:8081")

    zmq.Proxy(subscriber, publisher, nil)
}

And "data generators":

// Data generator
package main

import (
    zmq "github.com/pebbe/zmq4"
    "time"
)

func main() {
    publisher, _ := zmq.NewSocket(zmq.PUB)
    defer publisher.Close()
    publisher.Connect("tcp://127.0.0.1:8081")

    for {
        publisher.Send("Some msg", 0)
        time.Sleep(time.Second)
    }
}

And "data receivers":

// Data receiver
package main

import (
    "fmt"
    zmq "github.com/pebbe/zmq4"
)

func main() {
    subscriber, _ := zmq.NewSocket(zmq.SUB)
    defer subscriber.Close()
    subscriber.Connect("tcp://127.0.0.1:8080")

    for {
        msg, _ := subscriber.Recv(0)
        fmt.Println(msg)
    }
}

It seems to me that it should work, but it isn't. What am I doing wrong?


Answer:

To receive any data subscriber socket should specify SetSubscibe with some filter. see Official documentation

Question:

I'm working on distributed application. The networking is based on ZMQ (jeromq), here's my architecture:

N Clients (Dealer socket) <---> (Router) Proxy (Dealer) <---> 1 (Dealer) Reciving Worker 
                                                        <---> N (Dealer) Processing Workers
  • So my Receiving Worker is running in a separate thread listening for messages and putting them into the queue.
  • The Processing Workers pick the message from the queue, do some processing and send it back to the Client (processing worker is connected to the proxy backend)

When a client sends a message, it is received by the receiving worker, placed in the queue, the processing worker picks it up does some processing and sends it back, the client receives the response after which it sends another request, that request is not being received by the receiving worker but it is sent by the client. Basically only the first 'round' works. What am i missing here ?!


Answer:

What you're missing is that Dealer sockets send out messages to any connected client in a round robin fashion. The upshot is that your architecture won't work the way you intend.

To start, I'll write up a more accurate diagram of your intended architecture (for the moment, I'm ignoring socket types):

Client <-----> Broker ------> Receiver
                  ^              |
                  |              |
                  \              v
                   ---------- Processor

When you send the first message, it gets sent to the receiver - because it's the first connected socket in the queue. When you send the second message, the DEALER socket on your broker is sending to the next socket in the queue, in a round-robin fashion, which is the first processor. Basically, the problem is that your broker isn't able to distinguish between your "receiver" socket and your "processor" sockets, it treats them all as the same thing.

There are a couple ways you could handle this. First of all, you probably don't really need your "receiver" socket as a go-between, you may be able to communicate directly back and forth between your broker and your processing workers. I'd suggest that first, as that's typically how these types of architectures work.

You could also add another socket to your broker, have one socket to send to the receiver and one to receive from the processors.

Or, you could replace your DEALER socket on your broker with another ROUTER socket, which will allow you to directly address the receiver socket every time.


Follow up, to address the question in your comment:

Each of your workers having a different responsibility is a good reason to replace the DEALER socket on your broker with a ROUTER socket.

My off-the-shelf recommendation is to rearchitect around the Majordomo Protocol (later version can be found here, and talked about in the ZMQ guide here, with Java code samples). It does exactly what you seem to be looking for, and does it well.

But, at a simpler level, what you'll be doing is the following:

  1. Spin up your broker, with ROUTER sockets on both the front and back ends - both of these sockets are bind()-ed.
  2. Spin up your workers. Each worker is aware of its own responsibility. Remove the "receiver", we won't be using it.
  3. connect() your workers do your broker.
  4. Each worker sends a message to the broker, telling the broker what that worker is responsible for. The broker keeps a record of each worker by ID, and what job that worker is responsible for.
  5. When a broker wants to send a specific job, it looks up which worker is responsible for that job, and sends the job to it.
  6. If the worker is available immediately, it works on it, otherwise it holds onto the job until it's ready to process it.
  7. When the worker is finished, it sends the job back and continues working on new jobs if it has any.

All of that is what the Majordomo Protocol implements, complete with other necessary elements like heartbeating that make a fully fleshed out ZMQ application. That's why I recommend you go straight to that protocol, if it serves the needs you're trying to accomplish, rather than trying to build your own.

Question:

I had a problem concerning ZeroMQ because I used pointer on a ZMQ socket for zmq::proxy and zmq::poll. Doing that occurs an exception with error 88 (Socket operation on non-socket).

Actually ZeroMQ wants the user to send a structure cast in void* (source of the information)

I did some research in the official documentation but I didn't found why ZeroMQ doesn't use a pointer on the socket.

edit: This is the code I thought would be correct

zmq::socket_t frontend_;
zmq::socket_t backend_;
zmq::proxy(&frontend_, &backend_, nullptr);

and the code that is actually working is this one :

zmq::socket_t frontend_;
zmq::socket_t backend_;
zmq::proxy((void *)frontend_, (void *)backend_, nullptr);

It's strange for me. Why does ZeroMQ do it ?


Answer:

There was nothing strange in it.

A pointer is present in the socket_t class and an operator static_cast<void *>() method is returning this pointer instead of using the actual instance of the class.

Can a cast operator be explicit?

This post helped me to understand that a problem appears in the official documentation test of zmq. In c++11 it's impossible to use directly

zmq::proxy(frontend_, backend_, nullptr);

because the conversion is explicit in the zmq code. So you need to properly cast the socket like that :

zmq::proxy(static_cast<void *>(frontend_), static_cast<void *>(backend_), nullptr);

Question:

I am able to have c# (client) and python (server) talk to each other by using a simple request-reply. However, I want my web application built on c# asp.net to be stable and need more clients and servers, so I tried connecting c# and python using the Router-Dealer Proxy with python.

I tried running the proxy python script first, then running c# (client), then python (server). However, when I run the python (server), it gives me an "Address in use" error message.

Am I running them in a wrong order OR is there something wrong with the proxy python script (shown below)?

5602 = c# client

5603 = python server

def main():

context = zmq.Context()

# Socket facing clients
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5602")

# Socket facing services
backend  = context.socket(zmq.DEALER)
backend.bind("tcp://*:5603")

zmq.proxy(frontend, backend)

# We never get here…
frontend.close()
backend.close()
context.term()

if __name__ == "__main__":
main()

Answer:

I'm assuming your servers use bind, so the proxy should connect to them rather than also using bind.

Note: in zeromq the order of application startup doesn't matter so you can tell your proxy to connect to a server that doesn't yet exist, when the server is started the connection will be made.

Question:

I have been playing around with ZMQ and love it. I am using it to build a distributed test execution application for use with Cucumber. I am having a problem though. I am using a simple extended Req > Proxy > Reply pattern using the built int zmq_proxy. I have everything set up and when I run it locally where all components are located on my machine, everything works perfectly. But when I put the worker (the Rep socket) on another machine on the network, the Rep socket doesnt seem to ever make the connection to the Dealer socket that I passed into the zmq_proxy. I have tested a simple req > rep on the two machines and that works out just fine. The problem seems to only be with the rep > proxy > rep pattern. The setup seems straight forward and will place the code below, but I just can't seem to get it to work. Any help would be greatly appreciated.

This is all done in Ruby 1.9.3 btw. Also, in the below setup the Client and the Proxy are on the same machine while the Worker is on a remote machine.

Client (Req socket) require 'ffi-rzmq'

context = ZMQ::Context.new
socket = context.socket(ZMQ::REQ)
socket.connect('tcp://127.0.0.1:5550')

socket.send_string "This is a message"

reply = ''

socket.recv_string reply

puts reply

Worker (Rep socket)

require 'ffi-rzmq'

context = ZMQ::Context.new
socket = context.socket(ZMQ::REP)
socket.connect 'tcp://ipAddressToDealer:5551'

message = ''

socket.recv_string message

puts message

socket.send_string "Got the messsage"

Proxy

require 'ffi-rzmq'

context = ZMQ::Context.new
front_end_socket = context.socket(ZMQ::ROUTER)
front_end_socket.bind('tcp://*:5550')

back_end_socket = context.socket(ZMQ::DEALER)
back_end_socket.bind('tcp://*:5551')

ZMQ::Device.new(front_end_socket, back_end_socket)

Answer:

It turns out that windows firewall was blocking the Ruby Interpreter. I added exceptions in the firewall for the interpreter and everything is working as it should. Not sure why it displayed the symptoms that it did though. I would imagine that the simple req > rep request would have been blocked as well. Oh well, problem solved.