Hot questions for Using ZeroMQ in go

Question:

I've been using ZMQ in some Python applications for a while, but only very recently I decided to reimplement one of them in Go and I realized that ZMQ sockets are not thread-safe.

The original Python implementation uses an event loop that looks like this:

while running:
    socks = dict(poller.poll(TIMEOUT))
    if socks.get(router) == zmq.POLLIN:
        client_id = router.recv()
        _ = router.recv()
        data = router.recv()
        requests.append((client_id, data))

    for req in requests:
        rep = handle_request(req)
        if rep:
            replies.append(rep)
            requests.remove(req)

    for client_id, data in replies:
        router.send(client_id, zmq.SNDMORE)
        router.send(b'', zmq.SNDMORE)
        router.send(data)
        del replies[:]

The problem is that the reply might not be ready on the first pass, so whenever I have pending requests, I have to poll with a very short timeout or the clients will wait for more than they should, and the application ends up using a lot of CPU for polling.

When I decided to reimplement it in Go, I thought it would be as simple as this, avoiding the problem by using infinite timeout on polling:

for {
    sockets, _ := poller.Poll(-1) 
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case router:
            msg, _ := s.RecvMessage(0)
            client_id := msg[0]
            data := msg[2]
            go handleRequest(router, client_id, data)                
        }
    }
}

But that ideal implementation only works when I have a single client connected, or a light load. Under heavy load I get random assertion errors inside libzmq. I tried the following:

  1. Following the zmq4 docs I tried adding a sync.Mutex and lock/unlock on all socket operations. It fails. I assume it's because ZMQ uses its own threads for flushing.

  2. Creating one goroutine for polling/receiving and one for sending, and use channels in the same way I used the req/rep queues in the Python version. It fails, as I'm still sharing the socket.

  3. Same as 2, but setting GOMAXPROCS=1. It fails, and throughput was very limited because replies were being held back until the Poll() call returned.

  4. Use the req/rep channels as in 2, but use runtime.LockOSThread to keep all socket operations in the same thread as the socket. Has the same problem as above. It doesn't fail, but throughput was very limited.

  5. Same as 4, but using the poll timeout strategy from the Python version. It works, but has the same problem the Python version does.

  6. Share the context instead of the socket and create one socket for sending and one for receiving in separate goroutines, communicating with channels. It works, but I'll have to rewrite the client libs to use two sockets instead of one.

  7. Get rid of zmq and use raw TCP sockets, which are thread-safe. It works perfectly, but I'll also have to rewrite the client libs.

So, it looks like 6 is how ZMQ was really intended to be used, as that's the only way I got it to work seamlessly with goroutines, but I wonder if there's any other way I haven't tried. Any ideas?


Update

With the answers here I realized I can just add an inproc PULL socket to the poller and have a goroutine connect and push a byte to break out of the infinite wait. It's not as versatile as the solutions suggested here, but it works and I can even backport it to the Python version.


Answer:

I opened an issue a 1.5 years ago to introduce a port of https://github.com/vaughan0/go-zmq/blob/master/channels.go to pebbe/zmq4. Ultimately the author decided against it, but we have used this in production (under VERY heavy workloads) for a long time now.

This is a gist of the file that had to be added to the pebbe/zmq4 package (since it adds methods to the Socket). This could be re-written in such a way that the methods on the Socket receiver instead took a Socket as an argument, but since we vendor our code anyway, this was an easy way forward.

The basic usage is to create your Socket like normal (call it s for example) then you can:

channels := s.Channels()
outBound := channels.Out()
inBound := channels.In()

Now you have two channels of type [][]byte that you can use between goroutines, but a single goroutine - managed within the channels abstraction, is responsible for managing the Poller and communicating with the socket.

Question:

I've built an app written on python, based on ZeroMQ, but now I'm facing perfomance issues. So I decided to rewrite some modules of my app using, let's say, Golang. But when I try to establish messaging between sockets, implemented by different languages, anything does not work.I've searched so far, but I didn't find any info about compatibility problems of ZeroMQ using different languages.

So the question is: Can I use golang for server implementation based on ZeroMQ and client written on python to connect to it?Or do I have to use only one language?

EDIT: here are typical server and client that I'm trying to get working properly

server:

import zmqctx = zmq.Context()
sock = ctx.socket(zmq.REP)
sock.bind("tcp://*:57000")
msg = sock.recv()

client:

package main

import (
    zmq "github.com/pebbe/zmq4"
)

func main() {

    ctx, _ := zmq.NewContext()
    sock, _ := ctx.NewSocket(zmq.REQ)

    sock.Connect("tcp://localhost:57000")
    sock.Send("simple message", 0)
}

Server stucks at sock.recv()


Answer:

Programming languages are able to communicate with each other -- yes, you can write a server in Go and a client in Python and have them communicate with each other.

If you're trying to communicate using raw sockets, look at the documentation for your desired languages and make sure that the serialized data match in structure.

e.g. You could decide on a struct and implement your payload in Python or code and then match that structure in Go (there may be a better/stdlib way in Go; I'm not very experienced in this language). The potential maintenance and implementation headache in worrying about these details is why people use higher-level protocols -- exactly a good use case for ZeroMQ.

If you're using a message queue as the backbone of your operation, simply use that as the shared protocol. If your Python code can speak with ZeroMQ, then it's doing its job correctly -- there's no need for your Go code to know it's speaking to Python.

In this case, your new Go server would bind to ZeroMQ, your Python client would bind to ZeroMQ, and your two heterogenous languages need to know nothing about each other.

Question:

I am trying to figure out if i have two routines calling the same zeromq socket if when the return from socket.recv() on routine1 returns.. the socket will know to return to routine1 (ditto for routine2, ... routinen).

Hence... using a messaging bus with request/response semantics, what is the best pattern to approach this in go?

If it helps.. think about the simple pirate pattern and implementing an RPC style call set in that pattern with zeromq.

Socket; //zeromq socket
//routine 1
socket.send(data) // do i need some identifier here for this routine?
socket.recv() // wait until i get a response from that send
//routine 2 
socket.send(data)
socket.recv()

so in this case i have no idea if the response from routine1 or routine2 will come back first. How can i make sure that when i get a response to socket... i can notify the correct routines' recv() function.

Thanks!


Answer:

I don't believe a socket can be shared, concurrently, between threads using zeromq

Likely you need to do some kind of goroutine that listens for requests, sends / receives zmq, and then replies.

for example: (untested code)

type Req struct {
   Data []byte 
   Reply chan []byte
}

go func() { // probably not the zmq api, but you should get the idea here
    for req := <- requests {
         socket.send(req.Data)
         req.Reply <- socket.recv() 
    }
}

Question:

I have an application running on a server which takes requests from a phone app and then load balances the request across worker servers. I'm trying to add a timeout in the case that messages on the main server that have been in the outbound queue for the length of the timeout are removed from the queue. More specifically, the application on the main server is written in golang and implements the Paranoid Pirate Pattern of load balancing. The code I currently have is:

import (
    "fmt"
    zmq "github.com/pebbe/zmq4"
    "time"
)

const (
    HEARTBEAT_LIVENESS = 3
    HEARTBEAT_INTERVAL = 1500 * time.Millisecond

    MESSAGE_READY     = "\001"
    MESSAGE_HEARTBEAT = "\002"
)

var (
    client *zmq.Socket
    backend *zmq.Socket
    frontend *zmq.Socket
    workerPoller *zmq.Poller
    brokerPoller *zmq.Poller
    workerQueue []Worker
)

type Worker struct {
    Id string
    Expire time.Time
}

type RequestWrapper {
    RequestToSend Request

}

func NewWorker(id string) Worker {
    return Worker{
        Id: id,
        Expire: time.Now().Add(HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS),
    }
}

func AddReadyWorker(workers []Worker, worker Worker) []Worker {
    fmt.Println(worker.Id, " joined")
    for i, w := range workers {
        if worker.Id == w.Id {
            if i == 0 {
                workers = workers[1:]
            } else if i == len(workers)-1 {
                workers = workers[:i]
            } else {
                workers = append(workers[:i], workers[i+1:]...)
            }
            break
        }
    }
    return append(workers, worker)
}

func PurgeInactiveWorkers() {
    now := time.Now()
    for i, worker := range workerQueue {
        if now.Before(worker.Expire) {
            workerQueue = workerQueue[i:]
            return
        }
    }

    workerQueue = workerQueue[0:0]
}

func LoadBalance() {
// Loop:
    heartbeat := time.Tick(HEARTBEAT_INTERVAL)
    for {
        var sockets []zmq.Polled

        // If you have available workers, poll on the both front and backend
        // If not poll on backend with infinite timeout
        if len(workerQueue) > 0 {
            sockets, _ = brokerPoller.Poll(HEARTBEAT_INTERVAL)
        } else {
            sockets, _ = workerPoller.Poll(HEARTBEAT_INTERVAL)
        }

        for _, socket := range sockets {
            switch socket.Socket {
                // backend is a router
                case backend:
                    workerId, _ := backend.Recv(0)
                    workerQueue = AddReadyWorker(workerQueue, NewWorker(workerId))
                    clientId, _ := backend.Recv(0)
                    if clientId != MESSAGE_READY && clientId != MESSAGE_HEARTBEAT {
                        route, _ := backend.Recv(0)
                        message, _ := backend.RecvBytes(0)

                        fmt.Println("Received response")
                        RouteResponse(route, message)

                        // frontend.Send(clientId, zmq.SNDMORE)
                        // frontend.Send("", zmq.SNDMORE)
                        // frontend.SendBytes(message, 0)
                    }
                // frontend is a dealer
                case frontend:
                    clientId, _ := frontend.Recv(0)
                    route, _ := frontend.Recv(0)
                    message, _ := frontend.RecvBytes(0)

                    backend.Send(workerQueue[0].Id, zmq.SNDMORE)
                    backend.Send(clientId, zmq.SNDMORE)
                    backend.Send(route, zmq.SNDMORE)
                    backend.SendBytes(message, 0)

                    workerQueue = workerQueue[1:]
            }
        }

        select {
            case <-heartbeat:
                for _, worker := range workerQueue {
                    backend.Send(worker.Id, zmq.SNDMORE)
                    backend.Send(MESSAGE_HEARTBEAT, 0)
                }
                break
            default:
        }

        PurgeInactiveWorkers()
    }
}

If the backend sends a message, but it is not actually sent to a worker in some period of time, I want it to expire and not ever be sent. Is there a socket option that can accomplish this? If not, what would I have to do to accomplish this?

Two ways I think I can do this without socket options are:

1) Have the backend wrap the message in a wrapper and send to a golang queue and not through zeromq. The wrapper contains the time that the message was "sent". The backend concurrently pulls from the front of the golang queue one at a time and checks if the message is expired. If so, don't send, if not, send the message. I could have the backend add the message to the golang queue first and then truly send it out after in the same block of code. That way, I don't need a lock.

2) Send the wrapper message through zeromq to a retriever and the retriever checks if its expired and returns early. I don't like this because it seems like its bad for performance.


Answer:

What you're trying to do is use communication as an execution rendezvous. The sender wants to know something about when the receiver gets messages.

ZMQ implements the Actor model. What you need is a modification of the Communicating Sequential Processes model (one where sends timeout). Basically you need to add control message flows to/from the workers, the idea being that the server asks the worker to receive a message and the server waits for the reply. The reply means that the worker is ready to receive a message right now, and that the server and worker have both rendezvoused at a send/receive in their program flows. If that reply fails to arrive within timeout seconds, then the server doesn't send the actual message.

Or you could cheat by having everything going to the workers regardless, wrapped in a message that carries a "sent at time X" field, and have the worker decide to discard old messages.

Question:

I was trying to use gozmq for my go project with ZMQ however, I was having some problems and wasn't really sure what the problem was.

When I run my application with zmq as in:

$ go run main.go

I get back:

# pkg-config --cflags libzmq libzmq libzmq libzmq 
exec: "pkg-config": executable file not found in $PATH

I don't actually know what it means or why it comes up. However, I guessed that I didn't install zmq or it cannot find it or something...so I brew installed it with

brew install zmq

But I get:

Warning: zeromq-4.0.4 already installed

So I know that I already installed it. Even though its installed it doesn't work, weird?

The instruction at gozmq do say to run for 4.x install with:

go get -tags zmq_4_x github.com/alecthomas/gozmq

I literally ran:

go get -tags zmq_4_x github.com/alecthomas/gozmq

and

go get -tags zmq_4_0 github.com/alecthomas/gozmq

and

go get -tags zmq_4_0_4 github.com/alecthomas/gozmq

I thought the last one would work since it matches the version I had but non of the above worked.

In fact they all just returned the familiar error:

# pkg-config --cflags libzmq libzmq libzmq libzmq
exec: "pkg-config": executable file not found in $PATH

As a last resource I tried to go to (the official site) at:

http://zeromq.org/docs:source-git

to install it but the instruction they have for MAC OS X and Unix-like systems does not work for my computer. Does anyone know whats going on? What else should I try?


Answer:

The problem for pkg-config was easy to solve. I just brew installed it as the duplicate suggested.

However,if you look a little closer, I had 4.0.4. Which seems to be a version of zmq that is not properly supported by gozmq (or it doesn't work because it has errors thrown by the compiler).

I had to then go and brew install zmq but version 3.2.4 and then install the gozmq version for that zmq version.

To do that read Homebrew install specific version of formula? that shows you how to install a specific version of anything in brew.

Question:

I am trying to implement a very basic PUB/SUB pattern using ZeroMQ. I would like to have a server (always active) broadcasting messages (publisher) to all clients and does not care about connected clients. If a clients connect to this server as a subscriber, it should receive the message.

However, I can not send the message using PUB/SUB.

In Python it would be:

# publisher (server.py)
import zmq

ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:9091')

while True:
    publisher.send_string("test")

and

# subscriber (client.py)
import zmq

ctx = zmq.Context()
subscriber = ctx.socket(zmq.SUB)
subscriber.connect('tcp://127.0.0.1:9091')

while True:
    msg = subscriber.recv_string()
    print msg

Or in golang:

package main

import (
    "github.com/pebbe/zmq4"
    "log"
    "time"
)

func Listen(subscriber *zmq4.Socket) {
    for {
        s, err := subscriber.Recv(0)
        if err != nil {
            log.Println(err)
            continue
        }
        log.Println("rec", s)
    }
}

func main() {
    publisher, _ := zmq4.NewSocket(zmq4.PUB)
    defer publisher.Close()
    publisher.Bind("tcp://*:9090")

    subscriber, _ := zmq4.NewSocket(zmq4.SUB)
    defer subscriber.Close()
    subscriber.Connect("tcp://127.0.0.1:9090")

    go Listen(subscriber)
    for _ = range time.Tick(time.Second) {
        publisher.Send("test", 0)
        log.Println("send", "test")

    }
}

Did I mis-understood this pattern or do I need to send a particular signal from the client to the server, when connecting. I am interested in the golang version and only use the python version for testing.


Answer:

Did I mis-understood this pattern? Yes, fortunately you did.

ZeroMQ archetypes were defined so as to represent a certain behaviour. As said, PUSH-archetype AccessPoint pushes every message "through" all the so far setup communication channels, PULL-er AccessPoint pulls anything that has arrived down the line(s) to "it's hands", PUB-lisher AccessPoint publishes, SUB-scriber AccessPoint subscribes, so as to receive just the messages, that match it's topic-filter(s), but not any other.

As it seems clear, such Archetype "specification" helps build the ZeroMQ smart messaging / signalling infrastructure for our ease of use in distributed-systems architectures.


# subscriber (client.py)
import zmq

ctx        = zmq.Context()
subscriber = ctx.socket( zmq.SUB )
subscriber.connect( 'tcp://127.0.0.1:9091' )
subscriber.setsockopt( zmq.LINGER,    0 )         # ALWAYS:
subscriber.setsockopt( zmq.SUBSCRIBE, "" )        # OTHERWISE NOTHING DELIVERED

while True:
    msg = subscriber.recv_string()                # MAY USE .poll() + zmq.NOBLOCK
    print msg

subscriber, _ := zmq4.NewSocket( zmq4.SUB )
subscriber.Connect(             "tcp://127.0.0.1:9090" )
subscriber.SetSubscribe(         filter )                 // SET: <topic-filter>

subscriber.SetLinger(            0 )    //  SAFETY FIRST: PREVENT DEADLOCK
defer subscriber.Close()                //  NOW MAY SAFELY SET:

...
msg, _ := subscriber.Recv( 0 )

As defined, any right instantiated SUB-side AccessPoint object has literally zero-chance to know, what will be the choice of what messages are those right ones, so that they ought be "delivered" and what messages are not.

Without this initial piece of knowledge, ZeroMQ designers had a principal choice to be either Archetype-policy consistent and let PUB-side AccessNode to distribute all the .send()-acquired messages only to those SUB-side AccessNode(s), that have explicitly requested to receive any such, right via the zmq.SUBSCRIBE-mechanics or to deliver everything sent from PUB also to all so far un-decided SUB-s.

The former was a consistent and professional design step from ZeroMQ authors. The latter would actually mean to violate ZeroMQ own RFC-specification.

The latter choice would be something like if one has just moved to a new apartment, one would hardly expect to find all newspapers and magazines to appear delivered in one's new mailbox from next morning on, would one? But if one subscribes to Boston Globe, the very next morning the fresh release will be at the doorstep as it will remain to be there, until one cancels the subscription or the newspaper went bankrupt or a lack of paper rolls prevented the printing shop from delivering in due time and fashion or a traffic jam in the Big Dig tunnel might have caused troubles for all or just the local delivery some one particular day.

All this is natural and compatible with the Archetype-policy.

Intermezzo: Golang has already bindings to many different API versions Technology purists will object here, that early API releases ( till some v3.2+ ) actually did technically transport all message-payloads from a PUB to all SUB-s, as it simplified PUB-side workload envelope, but increased transport-class(es) data-flow and SUB-side resources / deferred topic-filter processing. Yet all this was hidden from user-code, right by the API horizon of abstraction. So, except of a need to properly scale resources, this was transparent to user. More recent API versions reverted the role of topic-filter processor and let this to now happen on the PUB-side. Nevertheless, in both cases, the ZeroMQ RFC specification policy is implemented in such a manner, the SUB-side will never deliver ( through the .recv()-interface ) a single message, that was not matching the valid, explicit SUB-side subscription(s)

In all cases a SUB-side has not yet explicitly set any zmq.SUBSCRIBE-instructed topic-filter, it cannot and will not deliver anything ( which is both natural and fully-consistent with the ZeroMQ RFC Archetype-policy defined for the SUB-type AccessPoint ).

The Best Next Step:

Always, at least, read the ZeroMQ API documentation, where all details are professionally specified - so at least, one can get a first glimpse on the breath of the smart messaging / signaling framework.

This will not help anyone to start on a green-field and fully-build one's own complex mental-concept and indepth understanding of how all the things work internally, which is obviously not any API-documentation's ambition, is it? Yet, this will help anyone to refresh or remind about all configurable details, once one has mastered the ZeroMQ internal architecture, as detailed in the source, referred in the next paragraph.

Plus, for anyone, who is indeed interested in distributed-systems or just zeromq per-se, it is worth one's time and efforts to always read Pieter HINTJENS' book "Code Connected, Volume 1" ( freely available in pdf ) plus any other of his books on his rich experience on software engineering later, because his many insights into modern computing may and will inspire ( and lot ).

edit:

MWE in GO
package main

import (
    "github.com/pebbe/zmq4"
    "log"
    "time"
)

func Listen(subscriber *zmq4.Socket) {
    for {
        s, err := subscriber.Recv(0)
        if err != nil {
            log.Println(err)
            continue
        }
        log.Println("rec", s)
    }
}

func main() {
    publisher, _ := zmq4.NewSocket(zmq4.PUB)
    publisher.SetLinger(0)
    defer publisher.Close()

    publisher.Bind("tcp://127.0.0.1:9092")

    subscriber, _ := zmq4.NewSocket(zmq4.SUB)
    subscriber.SetLinger(0)
    defer subscriber.Close()

    subscriber.Connect("tcp://127.0.0.1:9092")
    subscriber.SetSubscribe("")

    go Listen(subscriber)
    for _ = range time.Tick(time.Second) {
        publisher.Send("test", 0)
        log.Println("send", "test")
    }
}

Question:

I am new to ZeroMQ. I want to create multiple publishers, where each publisher is publishing specific data, such as following:

  1. Publisher 1: Publishing image data
  2. Publisher 2: Publishing audio data
  3. Publisher 3: Publishing text data

Basically, my requirement is to publish the data from multiple publishers and receive using multiple receivers on the other side.

Please see the sample code below:

data_publisher.cpp

//  Prepare our context and all publishers
zmq::context_t context(1);
zmq::socket_t publisher1(context, ZMQ_PUB);
zmq::socket_t publisher2(context, ZMQ_PUB);
zmq::socket_t publisher3(context, ZMQ_PUB);
zmq::socket_t publisher4(context, ZMQ_PUB);

publisher1.bind("tcp://*:5556");
publisher2.bind("tcp://*:5557");
publisher3.bind("tcp://*:5558");
publisher4.bind("tcp://*:5559");

//  Initialize random number generator
srandom((unsigned)time(NULL));
while (1) {
    // sample data
    int zipcode1 = within(100000);
    int zipcode2 = within(100000);
    int zipcode3 = within(100000);
    int zipcode4 = within(100000);

    int temperature1 = within(215) - 80;
    int temperature2 = within(215) - 80;
    int temperature3 = within(215) - 80;
    int temperature4 = within(215) - 80;

    int relhumidity1 = within(50) + 10;
    int relhumidity2 = within(50) + 10;
    int relhumidity3 = within(50) + 10;
    int relhumidity4 = within(50) + 10;

    zmq::message_t message1(20);
    zmq::message_t message2(20);
    zmq::message_t message3(20);
    zmq::message_t message4(20);

    snprintf((char*)message1.data(), 20, "%05d %d %d", zipcode1, temperature1, relhumidity1);
    snprintf((char*)message2.data(), 20, "%05d %d %d", zipcode2, temperature2, relhumidity2);
    snprintf((char*)message3.data(), 20, "%05d %d %d", zipcode3, temperature3, relhumidity3);
    snprintf((char*)message4.data(), 20, "%05d %d %d", zipcode4, temperature4, relhumidity4);

    publisher1.send(message1);
    publisher2.send(message2);
    publisher3.send(message3);
    publisher4.send(message4);
}

data_receiver.cpp

zmq::context_t context(1);

//  Socket to talk to server
zmq::socket_t subscriber1(context, ZMQ_SUB);
zmq::socket_t subscriber2(context, ZMQ_SUB);
zmq::socket_t subscriber3(context, ZMQ_SUB);
zmq::socket_t subscriber4(context, ZMQ_SUB);

subscriber1.connect("tcp://localhost:5556");
subscriber2.connect("tcp://localhost:5557");
subscriber3.connect("tcp://localhost:5558");
subscriber4.connect("tcp://localhost:5559");

const char* filter = (argc > 1) ? argv[1] : "10001 ";
subscriber1.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber2.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber3.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber4.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));

//  Process 100 updates
int update_nbr;
long total_temp1 = 0;
long total_temp2 = 0;
long total_temp3 = 0;
long total_temp4 = 0;

for (update_nbr = 0; update_nbr < 100; update_nbr++)
{
    zmq::message_t update1;
    zmq::message_t update2;
    zmq::message_t update3;
    zmq::message_t update4;
    int zipcode1, temperature1, relhumidity1;
    int zipcode2, temperature2, relhumidity2;
    int zipcode3, temperature3, relhumidity3;
    int zipcode4, temperature4, relhumidity4;

    subscriber1.recv(&update1);
    subscriber2.recv(&update2);
    subscriber3.recv(&update3);
    subscriber4.recv(&update4);

    std::istringstream iss1(static_cast<char*>(update1.data()));
    std::istringstream iss2(static_cast<char*>(update2.data()));
    std::istringstream iss3(static_cast<char*>(update3.data()));
    std::istringstream iss4(static_cast<char*>(update4.data()));

    iss1 >> zipcode1 >> temperature1 >> relhumidity1;
    iss2 >> zipcode2 >> temperature2 >> relhumidity2;
    iss3 >> zipcode3 >> temperature3 >> relhumidity3;
    iss4 >> zipcode4 >> temperature4 >> relhumidity4;

    total_temp1 += temperature1;
    total_temp2 += temperature2;
    total_temp3 += temperature3;
    total_temp4 += temperature4;
}

std::cout << "Average temperature for zipcode '" << filter << "' was "
          << (int)(total_temp1 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
          << (int)(total_temp2 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
          << (int)(total_temp3 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
          << (int)(total_temp4 / update_nbr) << "F" << std::endl;

Please note that the above code is a sample code to get the suggestions / advice.

I want to know if it is a good choice or not as shown in the sample code above?


Answer:

Still waiting for any quantitative fact, but let's start:

ZeroMQ is a concept of using smart enabling tools, whereas the low-level system programming is hidden by the ZeroMQ core element, the Context-engine.

This said, the high-level tools, formalised as Scalable Formal Communication Pattern Archetypes, deliver some sort of human-mimicking behaviour -- PUB Publishers indeed "publish", SUB Subscribers can "subscribe", REQ Requestors can "request", REP Repliers can indeed "reply", etc.

These AccessPoints with behaviour can .bind()/.connect() into some sort of a distributed-behavioural infrastructure, once having been granted some elementary rules. One of such rule is do not bother with actual transport-classes', all being indeed feature-rich technologies, currently spanning landscapes of { inproc:// | ipc:// | tcp:// | pgm:// | epgm:// | vmci:// }, low-level details, the Context()-instance will handle all these transparently to your high-level behaviour. Simply forget about this. Another rule is, you can be sure every message sent is either delivered error-free or not at all - no compromise in this, no tortured trash ever delivered to fool or crash the recipient's AccessPoint post-processing.

Failing to understand this, ZeroMQ is not used to it's maximum to deliver us both the comfort and the powers that were engineered into this luxurious tool.

Back to your dilemma:

Having said the few remarks above and having your primary architecture not clear yet, it is still possible to help you here.

ZeroMQ abstract, distributed-behaviour Socket-tool is principally a pure-[SERIAL] schedulling device. This means, none of any of your receiving AccessPoints { .bind() | .connect() }-associated with a Socket can expect to arbitrarily re-order a pure-sequantial flow of messages.

This means, in any case, where either a "just"-[CONCURRENT] process-scheduling or in an extreme case, where a true-[PARALLEL] process-scheduling is technically orchestrated, a single "pure"-[SERIAL] delivery channel will not allow a { [CONCURRENT] | [PARALLEL] }-system to keep delivering such mode of process-scheduling and will chop the flow of events / processing into a "pure"-[SERIAL] sequence of messages.

A ) This indeed could be both a reason and a must for introducing multiple independently operated ZeroMQ distributed-behaviour Socket instances.

B ) On the other side, not knowing anything about the global distributed-system behaviour, no one can yet tell for sure, whether going into multiple independently operated Socket instance is not just a waste of time and resources, delivering unreasonably under-average or unacceptably poor end-to-end system behaviour performance because of an extremely wrong or totally missing initial engineering decision.


Performance?

Do not guess in this domain, never. Rather start with first quantitatively declared needs, based on which a technically reasoned design will be able to proceed and define all the steps needed in the resources mapping and performance tweaking up to the platform limits.

ZeroMQ has been for the last two decades gloriously equipped for doing this with ultimate performance characteristics and the Design & Engineering team has done a great deal in polishing both the scalability and performance envelopes, while keeping latency on level hardly to achieve by an ad-hoc programmer. Indeed a great piece of system-programming hidden in the ZeroMQ basement.

"data is huge" -- ok, define the size -- delivering 1E+9 messages having 1 [B] in size has other performance tweaking than delivering 1E+3 messages having 1.000.000 [B] in size.

"as fast as possible" -- ok, define the fast for a given size and intended cadence of messages 1/s ~ 1 [Hz], 10/s ~ 10 [Hz], 1000/s ~ 1 [kHz]

Sure, under some certain circumstances such mix of requirements may wander outside of the contemporary computing-devices capabilities horizons. That has to get reviewed best before any programming has ever started, because otherwise you have just spoilt some amount of programming efforts on a thing, that will never fly, so better to have a positive proof of the solution architecture being feasible and doable within an acceptable resources and costs perimeter.

So, if your Project needs something, first define and quantitatively specify what that actually is, next the solution architecture can start to sort it out and provide decisions, what tools and what tool-configurations can match the defined target levels of functionality and performance objectives.

Building a house, having started by raising a roof will never answer the questions how to layout the basement wall a what would be a sufficient and yet not over-designed thickness of the iron-concrete armouring, that will carry an unknown amount of high-raised building floors. Having a roof already built is easy to show up, but has nothing to do with a systematic and rigorous design & engineering practice.

Question:

I'm using ZeroMQ's multiple connect feature, to connect a single DEALER to 2 ROUTERS:

            +----> .Connect() --> ROUTER 1
           /
DEALER ---+------> .Connect() --> ROUTER 2

In my test, I send 10 mesages through the DEALER. I get back a nice even distribution of 5 messages to each of the ROUTER-s.

My problem is, if ROUTER 1 goes away for some reason, the DEALER will still continue to queue messages for it, I think in the assumption that ROUTER 1 will eventually come back. I end up with only 5 messages on ROUTER 2.

What I need to happen is for DEALER to ignore disconnected or failed peers. Is this possible?

I've tried setting ZMQ_SNDHWM and many others, but nothing seems to work.

The only alternative I can see is to do the failover myself, with separate sockets, heartbeats and ACK packets etc. It seems like such a basic pattern should already be implemented by ZeroMQ.


Edit: testing code

package main

import (
    "github.com/pebbe/zmq4"
    "time"
    "log"
    "fmt"
)

func receiveAll(sok *zmq4.Socket) (received int) {
    poller := zmq4.NewPoller()
    poller.Add(sok, zmq4.POLLIN)

    for {
        sockets, err := poller.Poll(100 * time.Millisecond)
        if err != nil {
            log.Print(err)
        }
        if len(sockets) > 0 {
            for _, s := range sockets {
                msg, _ := s.Socket.RecvMessageBytes(0)
                if string(msg[1]) != "Hello World" {
                    log.Fatalf("Unexpected message: %s\n", msg)
                }
                received ++
            }
        } else {
            return
        }
    }
}

func main() {

    dealer, _ := zmq4.NewSocket(zmq4.DEALER)
    router1, _ := zmq4.NewSocket(zmq4.ROUTER)
    router2, _ := zmq4.NewSocket(zmq4.ROUTER)

    router1.Bind("tcp://0.0.0.0:6667")
    router2.Bind("tcp://0.0.0.0:6668")

    dealer.Connect("tcp://0.0.0.0:6667")
    dealer.Connect("tcp://0.0.0.0:6668")

    router1.SetSubscribe("")
    router2.SetSubscribe("")
    dealer.SetSubscribe("")

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count1 := receiveAll(router1)
    count2 := receiveAll(router2)

    fmt.Printf("Blue sky scenario: count1=%d count2=%d\n", count1, count2)

    // Shut down a peer
    router1.Close()
    time.Sleep(300 * time.Millisecond)

    for i := 0; i < 10; i++ {
        dealer.SendBytes([]byte("Hello World"), 0)
    }

    time.Sleep(300 * time.Millisecond)

    count := receiveAll(router2)

    fmt.Printf("Peer 1 offline: count=%d\n", count)

}

Answer:

What I need to happen is for DEALER to ignore disconnected or failed peers. Is this possible ?

Oh sure, it is. There is need to tweak the default ( inactive ) values, using you use-case specific settings in :

  • a .setsockopt( ZMQ.IMMEDIATE, 1 ) for not buffering message-instances for peer, that do not seem to be "alive"
  • a .setsockopt( ZMQ.HEARTBEAT_IVL, <ms> ) for sending heartbeats
  • a .setsockopt( ZMQ.HEARTBEAT_TTL, <ms> ) for a Time-To-Live setting
  • a .setsockopt( ZMQ.HEARTBEAT_TIMEOUT, <ms>) for a timeout threshold
  • a .setsockopt( ZMQ.HANDSHAKE_IVL, <ms> ) for managing (re-)establishment timeouts.

For details, check your language binding and what native API version it actually uses under the hood. Most of these settings are available since native-API v 3.x, the most recent native-API v 4.2.2 documentation will help you tune the values and configuration strategies.

Question:

I'm coding an Express app that should collect some data sent from a remote machine via ZMQ and update a MongoDB database with the received information.

The updates are sent every 5 minutes and the encoded JSON weights less than 1 KB. Nevertheless I noticed that when app.js is parsing the JSON and writing to the database, the client requests run slowlier.

Shall I put the ZMQ socket code in the app.js (OPTION A), or shall I just use a separate file (OPTION B)? In the case of option B i could do a bash crone to avoid heavy CPU active waits (as updates come every 5 minutes).


Answer:

Two part answer:

You can, and maybe should, separate your ZMQ message processing out from your Express App

These are two functionally separate actions, there's no reason they need to be in the same process, they don't need to access each other, so there's no harm and could be a benefit in making those separate processes. Whether it's the right choice for you is application dependent.

There's no reason processing a 1kb json structure once every 5 minutes should noticeably impact performance

... unless your volume of client requests is quite high. It should only introduce a couple milliseconds of delay, at worst, which under normal workloads should only impact a single request to a degree that would never be noticed by humans. If your request concurrency is high, then that delay can create a slight domino effect that affects multiple requests. Otherwise, you should consider whether your architecture has some inefficiencies that could be fixed to solve the problem.

Question:

I'm using the pebbe/zmq4 ZeroMQ bindings for Go, and I'm trying to develop higher level interfaces for my code that ZeroMQ implements in order to support mocking in my tests.

As an example of my question, the zmq4.Socket struct's RecvMessage function expects a zmq4.Flag as an argument. zmq4.Flag is simply an int, as defined by type Flag int in the Go bindings.

I'm trying to develop my interfaces without any dependencies on the ZeroMQ bindings, so I have an interface defined as:

type Socket interface {
    RecvMessage(int) ([]string, error)
}

When I try to use a ZeroMQ socket for this interface, I get an error stating ... have RecvMessage(zmq4.Flag) ([]string, error) want RecvMessage(int) ([]string, error).

Is there any way to handle this, or do I just need to bite the bullet and depend on the ZeroMQ bindings in my interfaces?


Answer:

It is important to realize that type Foo int is a separate type not an alias. See How to cast to a type alias in Go?

The only thing you can do to call RecvMessage with zmq4.Flag is to convert it to int.

var f zmq4.Flag = 1
RecvMessage(int(f))

Question:

I'm implementing the Espresso Pattern of ZMQ.

I want to connect many subscribers <> Proxy <> many publishers

However, the listener in the proxy only receives messages from one publisher. Hence, the subscribers only receive from that particular publisher. I can't figure out what's the problem with my code.

package playground

import (
    zmq "github.com/pebbe/zmq4"

    "fmt"
    "math/rand"
    "time"
    "testing"
)

func subscriber_thread(id int) {
    subscriber, _ := zmq.NewSocket(zmq.SUB)
    subscriber.Connect("tcp://localhost:6001")
    subscriber.SetSubscribe("")
    defer subscriber.Close()

    for {
        msg, err := subscriber.RecvMessage(0)
        if err != nil {
            panic(err)
        }
        fmt.Println("subscriber id:", id,"received:", msg)
    }
}

func publisher_thread(n int) {
    publisher, _ := zmq.NewSocket(zmq.PUB)
    publisher.Bind("tcp://*:6000")

    for {
        s := fmt.Sprintf("%c-%05d", n +'A', rand.Intn(100000))
        _, err := publisher.SendMessage(s)
        if err != nil {
            panic(err)
        }
        fmt.Println("publisher sent:", s)
        time.Sleep(100 * time.Millisecond) //  Wait for 1/10th second
    }
}

//  The listener receives all messages flowing through the proxy, on its
//  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
//  attached child threads. In other languages your mileage may vary:

func listener_thread() {
    pipe, _ := zmq.NewSocket(zmq.PAIR)
    pipe.Bind("inproc://pipe")

    //  Print everything that arrives on pipe
    for {
        msg, err := pipe.RecvMessage(0)
        if err != nil {
            break //  Interrupted
        }
        fmt.Printf("%q\n", msg)
    }
}

func TestZmqEspresso(t *testing.T) {
    go publisher_thread(0)
    go publisher_thread(1)
    go publisher_thread(2)

    go subscriber_thread(1)
    go subscriber_thread(2)

    go listener_thread()

    time.Sleep(100 * time.Millisecond)

    subscriber, _ := zmq.NewSocket(zmq.XSUB)
    subscriber.Connect("tcp://localhost:6000")

    publisher, _ := zmq.NewSocket(zmq.XPUB)
    publisher.Bind("tcp://*:6001")

    listener, _ := zmq.NewSocket(zmq.PAIR)
    listener.Connect("inproc://pipe")

    zmq.Proxy(subscriber, publisher, listener)

    fmt.Println("interrupted")

}

Answer:

I've figured out the solution. XPUB/XSUB should bind to the socket PUB and SUB workers should connect to socket

Working code below

package playground

import (
    zmq "github.com/pebbe/zmq4"

    "fmt"
    "log"
    "math/rand"
    "testing"
    "time"
)

func subscriber_thread(id int) {
    subscriber, err := zmq.NewSocket(zmq.SUB)
    if err != nil {
        panic(err)
    }
    err = subscriber.Connect("tcp://localhost:6001")
    if err != nil {
        panic(err)
    }
    err = subscriber.SetSubscribe("")
    if err != nil {
        panic(err)
    }
    defer subscriber.Close()

    for {
        msg, err := subscriber.RecvMessage(0)
        if err != nil {
            panic(err)
        }
        fmt.Println("subscriber id:", id, "received:", msg)
    }
}

func publisher_thread(n int) {
    publisher, err := zmq.NewSocket(zmq.PUB)
    if err != nil {
        panic(err)
    }
    //err = publisher.Bind("tcp://*:6000")
    err = publisher.Connect("tcp://localhost:6000")
    if err != nil {
        panic(err)
    }

    for {
        s := fmt.Sprintf("%c-%05d", n+'A', rand.Intn(100000))
        _, err := publisher.SendMessage(s)
        if err != nil {
            panic(err)
        }
        fmt.Println("publisher sent:", s)
        time.Sleep(100 * time.Millisecond) //  Wait for 1/10th second
    }
}

//  The listener receives all messages flowing through the proxy, on its
//  pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
//  attached child threads. In other languages your mileage may vary:

func listener_thread() {
    pipe, _ := zmq.NewSocket(zmq.PAIR)
    pipe.Bind("inproc://pipe")

    //  Print everything that arrives on pipe
    for {
        msg, err := pipe.RecvMessage(0)
        if err != nil {
            break //  Interrupted
        }
        fmt.Printf("%q\n", msg)
    }
}

func TestZmqEspresso(t *testing.T) {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)

    go publisher_thread(0)
    go publisher_thread(1)
    go publisher_thread(2)

    go subscriber_thread(1)
    go subscriber_thread(2)

    go listener_thread()

    time.Sleep(100 * time.Millisecond)

    subscriber, err := zmq.NewSocket(zmq.XSUB)
    if err != nil {
        panic(err)
    }
    //err = subscriber.Connect("tcp://localhost:6000")
    err = subscriber.Bind("tcp://*:6000")
    if err != nil {
        panic(err)
    }

    publisher, err := zmq.NewSocket(zmq.XPUB)
    if err != nil {
        panic(err)
    }
    err = publisher.Bind("tcp://*:6001")
    if err != nil {
        panic(err)
    }

    listener, _ := zmq.NewSocket(zmq.PAIR)
    listener.Connect("inproc://pipe")

    err = zmq.Proxy(subscriber, publisher, listener)
    if err != nil {
        panic(err)
    }

    fmt.Println("interrupted")

}

Question:

I have a REP socket that's connected to many REQ sockets, each running on a separate Google Compute Engine instance. I'm trying to accomplish the synchronization detailed in the ZMQ Guide's syncpub/syncsub example, and my code looks pretty similar to that example:

context = zmq.Context()
sync_reply = context.socket(zmq.REP)
sync_reply.bind('tcp://*:5555')

# start a bunch of other sockets ...

ready = 0
while ready < len(all_instances):
    sync_reply.recv()
    sync.reply.send(b'')
    ready += 1

And each instance is running the following code:

context = zmq.Context()
sync_request = context.socket(zmq.REQ)
sync_request.connect('tcp://IP_ADDRESS:5555')

sync_request.send(b'')
sync_request.recv()

# start other sockets and do other work ...

This system works fine up until a certain number of instances (around 140). Any more, though, and the REP socket will not receive all of the requests. It also seems like the requests it drops are from different instances each time, which leads me to believe that all the requests are indeed being sent, but the socket is just not receiving any more than (about) 140 of them.

I've tried setting the high water mark for the sockets, spacing out the requests over the span of a few seconds, switching to ROUTER/DEALER sockets - all with no improvement. The part that confuses me the most is that the syncsub/syncpub example code (linked above) works fine for me with up to 200 Google Compute Engine instances, which is as many as I can start. I'm not sure what about my code specifically is causing this problem - any help or tips would be appreciated.


Answer:

Answering my own question - it seems like it was an issue with the large number of sockets I was using, and also possibly the memory limitations of the GCE instances used. See comment thread above for more details.