Hot questions for Using ZeroMQ in message queue

Top 10 C/C++ Open Source / ZeroMQ / message queue

Question:

I am trying to make a class that will be able to send data and then receive them.

Right now, it is working only for the first send/receive and with another attempt to .send() it will throw an error below.

>Traceback (most recent call last):
  File "main.py", line 31, in <module>
    zq.send(arr)
  File "D:\ITIM\video2\MQCompare\cZMQ.py", line 17, in send
    self.socketC.send(data)
  File "zmq/backend/cython/socket.pyx", line 636, in zmq.backend.cython.socket.S
ocket.send (zmq\backend\cython\socket.c:7305)
  File "zmq/backend/cython/socket.pyx", line 683, in zmq.backend.cython.socket.S
ocket.send (zmq\backend\cython\socket.c:7048)
  File "zmq/backend/cython/socket.pyx", line 206, in zmq.backend.cython.socket._
send_copy (zmq\backend\cython\socket.c:3032)
  File "zmq/backend/cython/socket.pyx", line 201, in zmq.backend.cython.socket._
send_copy (zmq\backend\cython\socket.c:2920)
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc.
_check_rc (zmq\backend\cython\socket.c:10014)
    raise ZMQError(errno)
zmq.error.ZMQError: Operation cannot be accomplished in current state`

The code I'm using looks like this:

import zmq

class ZeroMQ:

def __init__(self):
    self.context = zmq.Context()
    self.socketS = self.context.socket(zmq.REP)
    self.socketS.bind("tcp://*:5555")
    self.socketC = self.context.socket(zmq.REQ)
    self.socketC.connect("tcp://localhost:5555")

def __exit__(self, exc_type, exc_value, traceback):
    self.socketC.close()
    self.socketS.close()

def send(self, data):
    self.socketC.send(data)

def recv(self):    
    self.socketS.recv()

Am I making the connection right?

Why is the function send throwing an error?

I will appreciate any help. Thank you.


Answer:

Simply because the socketC, being the instance of a REQ archetype, cannot ever send another message ( as was coded inside the implementation of the .send() class-method above ), without a prior call to socketC.recv() instance-method.

Both REQ and REP archetypes are well documented to have to obey a two-step dance of .send()-.recv()-.send()-.recv()-... resp. .recv()-.send()-.recv()-... using their native instance-methods.

This is how the ZeroMQ REQ / REP sockets were designed and documented.

Question:

I'm using Python bindings for ZeroMQ. My libzmq version is 4.2.5 and my pyzmq version is 17.1.2.

I'm trying to let a "producer" transmit a large amount of data to a "consumer". The code of the "producer" is :

# producer.py
import zmq
import time
import os

ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)
sock.bind('tcp://*:8000')

x = os.urandom(1000000000) # large amount of data, requires much time to transmit it
sock.send(x)
print('Done')

sock.setsockopt(zmq.LINGER, 0)
sock.close()

t1 = time.time()
ctx.term() # I expect this should return immediately
print(time.time() - t1)

And the code of "consumer" is :

# consumer.py
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)

sock.setsockopt_string(zmq.SUBSCRIBE, '')
sock.connect('tcp://localhost:8000')

data = sock.recv()

I expect the ctx.term() in the producer.py should return immediately, since the LINGER of the socket is already set to zero. But when I run these codes, the ctx.term() does not return immediately as expected. Instead, it takes tens of seconds for that function to return, and all of the large data have been successfully received by the consumer.py.

I am trying to figure out why, and I wish someone help me out a little.


Answer:

Q : "ZeroMQ: set LINGER=0 does not work as expected"

ZeroMQ set LINGER=0 does IMHO work as expected ( as documented ):

ZeroMQ documentation is clear in stating that all the zmq_setsockopt() calls ( wrapped, for a use in python, into a method .setsockopt() ) take effect, i.e. modify Socket-instances' behaviour.

Older versions of ZeroMQ documentation ( being in use in my Projects with ZeroMQ-wrapped distributed systems since v2.x ) were more explicit on this:

Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, ZMQ_LINGER, ZMQ_ROUTER_MANDATORY and ZMQ_XPUB_VERBOSE only take effect for subsequent socket bind/connects.

Having this in mind, the sock.setsockopt( LINGER, 0 ) indeed instructs the Socket()-instance sock not to wait on a respective <aContextINSTANCE>.term() until all its attempts to complete all the yet-enqued-only messages get fully propagated to the queue-head-end and there processed into the wireline-protocol and under its supervision being successfully sent or accepted to have been lost on their network-wide way to the neighbouring counterparty.

Yet, this does not say, what is going to be done with the data-in-transit, that the Context()-instance is already moving down the wire.

As far as I have worked extensively with ZeroMQ since v2.x, nothing IMHO reminds me about a way, how to interrupt an ongoing message transport, using the ZeroMQ semantics exposed to the public API, beyond the LINGER-instructed behaviour which might be paraphrased like :" IGNORE ANY KNOWN SENDS/RECEIVES THAT STILL WAIT ITS TURN IN QUEUE ", yet, this does not stop the progress of sending the data-in-transit down the line.

ZeroMQ works intentionally this way.

One might like to read more about ZeroMQ internals here or perhaps to just have a general view from the orbit-high perspective as in "ZeroMQ Hierarchy in less than a Five Seconds".


Epilogue : For Use in a Last Resort Case Only If indeed in an ultimate need to have some way to stop even these in-transit message flows, feel free to post a new question on how to make things work that wild way.

Question:

I'm new to ZeroMQ ( I've been using SQS so far ).

I would like to build a system where every time a user logs in, they subscribe to a queue. The all the users subscribed to this queue are interested only in messages directed to them.

I read about topic matching. It seems that I could create a pattern like this:

development.player.234345345
development.player.453423423
integration.player.345354664

And, each worker ( user ) can subscribe to the queue and listen only to the topic they match. i.e. a player 234345345 on the development environment will only subscribe to messages with the topic development.player.234345345

Is this true?

And if so, what are the consequences in ZeroMQ?

Is there a limit on how many topic matching I can have?


Answer:

ZeroMQ has a very detailed page on how the internals of topic matching works. It looks like you can have as many topics as you want, but topic matching incurrs a runtime cost. It's supposed to be extremely fast:

We believe that application of the above algorithms can give a system that will be able to match or filter a single message in the range of nanoseconds or couple of microseconds even it the case of large amount of different topics and subscriptions.

However, there are some caveats you need to be aware of:

The inverted bitmap technique thus works by pre-indexing a set of searchable items so that a search request can be resolved with a minimal number of operations.

It is efficient if and only if the set of searchable items is relatively stable with respect to the number of search requests. Otherwise the cost of re-indexing is excessive.

In short, as long as you don't change your subscriptions too often, you should be able to do on the order of thousands of topics at least.

Question:

I am testing out ZeroMQ and I am only getting around 1227 - 1276 messages per second. I have read however that these are supposed to be over 100x this amount.

What am I doing wrong? Is there some configuration I can specify to fix this?

I am using the following functionality:

public static final String SERVER_LOCATION = "127.0.0.1";
public static final int SERVER_BIND_PORT = 5570;

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{
    ZContext ctx = new ZContext();

    Socket frontend = ctx.createSocket(ZMQ.PULL);
    frontend.bind("tcp://*:"+SERVER_BIND_PORT);

    int i = 1;
    do{
        ZMsg msg = ZMsg.recvMsg(frontend);
        ZFrame content = msg.pop();
        if(content!= null){
            msg.destroy();
            System.out.println("Received: "+i);
            i++;
            content.destroy();
        }
    }while(true);
}

public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PUSH);

    client.setIdentity("i".getBytes());
    client.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);

    PollItem[] items = new PollItem[] { new PollItem(client, Poller.POLLIN) };
    int i = 1;
    Timer t = new Timer(timeToSpendSending);
    t.start();
    do{
        client.send(/* object to send*/ , 0);
        i++;
    }while(!t.isDone());

    System.out.println("Done with "+i);
}

Timer class used to limit time the program runs for:

class Timer extends Thread{
    int time;
    boolean done;
    public Timer(int time){
        this.time = time;
        done = false;
    }
    public void run(){
        try {
            this.sleep(time);
            done = true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public boolean isDone(){
        return done;
    }
}

Edit: I am using jeroMQ

<dependency>
    <groupId>org.zeromq</groupId>
    <artifactId>jeromq</artifactId>
    <version>0.3.4</version>
</dependency>

Answer:

I had to replace the connect method and removed the High Water Mark (set to 0 for unlimited messages in memory)

The code would be as follows:

public static final String SERVER_LOCATION = "127.0.0.1";
public static final int SERVER_BIND_PORT = 5570;
public static final String TOPIC = "topic1";

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{
    // Prepare our context and subscribe
       Context context = ZMQ.context(1);
       Socket subscriber = context.socket(ZMQ.SUB);

       subscriber.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);
       subscriber.setRcvHWM(0);
       subscriber.subscribe(TOPIC.getBytes());
       System.out.println("subscribed to  "+TOPIC);
       int i = 1;
       boolean started = false;
       Timer t = new Timer(timeToSpendSending);
       do{
           String msg = subscriber.recvStr();
           if(!TOPIC.equals(msg)){
               if(!started){
                   t.start();
                   started = true;
               }
               i++;
           }
       }while(!t.isDone());
       System.out.println("Done with: "+i);
       subscriber.close();
       context.term();
   }
   public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{
       Context context = ZMQ.context(1);
       Socket publisher = context.socket(ZMQ.PUSH);
       publisher.bind("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT);
       publisher.setHWM(0);
       publisher.setSndHWM(0);

       int i = 1;
       Timer t = new Timer(timeToSpendSending);
       t.start();
       do{
          publisher.sendMore(TOPIC);
          publisher.send("Test Data number "+i);
          i++;
      }while(!t.isDone());
      System.out.println("Done with: "+i);
      publisher.close();
      context.term();
   }

Like this I got message counts ranging in the 250,000 per second when sending and 145,000 per second when receiving.

Question:

I'm junior backend developer and now I'm working on a project about bank, which is a distributed system. What I knew before was that there were some message library such as ZeroMQ to realize the communication between components in a distributed system. But now, in the project, they used oracle queuing.

My colleague told me that this was better because we had no risk to lose any message to send even if processes die accidently.

My questions:Q1: If Oracle queuing is better, when should we use things like ZeroMQ? andQ2: What is the disadvantage of Oracle queuing, comparing with ZeroMQ?


Answer:

Your colleague is right here, because Oracle AQ comes with persistence and zeroMQ is in-memory. You'd use zeroMQ if you need max messages per second (millions). Price isn't a thing, because Oracle doesn't charge extra for AQ, it's even free with Oracle XE.

If your application relies on Oracle already, there are no disadvantages putting the messaging into Oracle.

Question:

I am using ZeroMQ and a publish/subscribe pattern, in Python.

The server sends a message, as follows:

ZMQsocket_Publisher.send(subscription_string)

which is received by a client, that, as consequence, starts a loop, like this:

loop_condition = True
while loop_condition:
    ZMQsocket_Pusher.send(strings, zmq.SNDMORE)
    ZMQsocket_Pusher.send(data)

during which by default infinitely responds to the server in each iteration by sending back data to the server.

The question is: I would like to stop the while when a particular event occurs by either changing the condition or sending a break/interrupt signal. The problem I am facing right now is that if the while is still in progress, that particular client is not able to receive a "stop" message sent in a second moment by the server. Is there an easy way to stop the while or the execution when using this pattern?

Thank you in advance!


Answer:

As I understood, you have something like this:

subscription_string = your_client_receiver_socket.recv()
strings, data = some_processing(subscription_string)
loop_condition = True
while loop_condition:
    ZMQsocket_Pusher.send(strings, zmq.SNDMORE)
    ZMQsocket_Pusher.send(data)

If that's the case, simply add check of stop signal in while loop:

while loop_condition:
    ZMQsocket_Pusher.send(strings, zmq.SNDMORE)
    ZMQsocket_Pusher.send(data)
    try:
        stop = your_client_receiver_socket.recv(flags=zmq.DONTWAIT)
        if check_on_stop(stop):
            break
    except zmq.error.Again:
        pass

zmq.DONTWAIT flag will say receiver to receive in non-blocking way, raising zmq.error.Again exception if it couldn't receive anything. With this you can send some stop signal from server to stop client's loop.

Question:

I am reproducing the Divide & Conquer example of zeroMQ of the zguide2. It uses a ventilator to push out tasks to several workers which send the result to a sink.

This all works fine. However, if I am simulating a slower client or very unevenly balanced tasks the running time is far from optimal. This is, e.g., possible by adding the lines

if(task_nbr % cpu_count() == 0):
        workload *= 4

after workload = random.randint(1, 100) in the ventilator.

I tried to reduce to high-water mark of the receiver (worker) and the sender (ventilator) but without improving the runtime.

The mistake might be in the choice of the socket-type "PUB"/"PULL"?

I can attach a minimal-working-example if wanted but due to the worker-ventilator-sink structure it's a bit lengthy (~150 lines).


Answer:

In this case it is better to use ROUTER and REQ sockets. This is possible in python with

client = context.socket(zmq.ROUTER)

and

worker = context.socket(zmq.REQ)

A fully worked out example by Jeremy Avnet can be found in the zguide. One can observe that even with the code of the question added the runtime will be close to optimal.

Why this works is described also in the in the section The Load Balancing Pattern:

workers send a "ready" message when they start, and after they finish each task. The broker reads these messages one-by-one. Each time it reads a message, it is from the last used worker. And because we're using a ROUTER socket, we get an identity that we can then use to send a task back to the worker