Hot questions for Using ZeroMQ in ipc

Question:

I came across a zeromq example code

subscriber = ctx.socket(zmq.XSUB)
subscriber.connect("tcp://localhost:6000")

publisher = ctx.socket(zmq.XPUB)
publisher.bind("tcp://*:6001")

The subscriber (client) is connecting to local host port 6000. But the publisher (the server) is binding to *:6001

What does this mean?


Answer:

It means "all interfaces, port 6001" - a given computer can have more than one network interface (a trivial example would be that the average computer's LAN IP and it's localhost address are two different interfaces. The * means to accept connections from any of them.

Question:

I'm attempting to use libzmq (4.2.2) via czmqz (4.0.2) to establish an IPC communication channel between a Python program and my C++ application.

I'm using the ZMQ publisher/subscriber pattern, where the Python program binds to a port and publishes messages using pyzmq (16.0.2), and my C++ program uses the zsock class from CZMQ to connect as a subscriber.

The problem is, whenever my subscriber attempts to connect, I get back error code 11, Resource temporarily unavailable. Strangely, this system seems to work just fine on my development machine, but not on the target system I'm deploying to.

The problem occurs on the very first line of initializing the socket in the following abridged method:

bool ZmqSocketWrapper::connectSubscriber(string address)
{
    m_address = address;
    m_pSocket = zsock_new_sub(address.c_str(), "");

    int errorCode = zmq_errno();
    if (errorCode != 0)
    {
        printf(zmq_strerror(errorCode));
        return false;
    }

    return true;
}

This is called as follows:

m_subscriberSocket->connectSubscriber("tcp://127.0.0.1:5555");

And I've also tried other variations, with the same result:

m_subscriberSocket->connectSubscriber("tcp://localhost:23232");
m_subscriberSocket->connectSubscriber("ipc:///tmp/test");

When searching online, it appears that most other people have this problem when attempting to send/receive, so it seems odd that I'm having this when just attempting to open the socket.

A few other details:

  • My ZMQ publisher is written in Python using pyzmq and is working fine on the same target system, which suggests that the problem is in czmq.
  • The machine where I'm seeing the problem is a Raspberry Pi, in case that's relevant, although bear in mind the point above.
  • No, nothing else is using that port, and I've confirmed using netstat that the server port is listening.
  • Yes, I've tried running my client as root.

Any help much appreciated!


Answer:

Good news is given a pure pyzmq scenario works fine,

the hardware or O/S related issues seem to have been excluded from the trouble list.


Damned bindings ( that blind user-programs by a "high-level" skewed-abstration )

The first thing is, that some high-level industrialists promoting to escape from native-API remove user-side options.

CZMQ did this for example in that it assumes, that every SUB-side wants to just .connect(), not leaving the reversed .bind()/.connect() choice available. Old good ZeroMQ API always let this option on user and that is fair. Similarly, there are many performance tuning variables that need to get set before a socket-connection is setup, which makes the "high-level" binding more troublesome for distributed-system implementations.

Next is the strange thing with a code running fine on one box and not on another.

My worries would be on user-side responsibility to set LINGER to zero + .close() any such created SUB socket ( which need not happen, if due exception handling is not in place ) before a zmq.Context() instance is safely .term()-inated. This may hang orphans that later cause a real-resource being ( still ) occupied, resulting in errno code 11, Resource temporarily unavailable.

Last but not least - resources are not consumables/disposables

The proposed code cannot better exhibit such resources inefficient behaviour and accumulates all the setup / configuration / tear-down overhead costs per each call. This is a very poor design practice ( while users may meet it in many "schoolbook"-examples ). If interested in more details on efficient distributed-computing arguments, also related to this, may also like this post.

bool ZmqSocketWrapper::connectSubscriber(string address)
{   b_RetFLAG = true;
    m_address = address;
    m_pSocket = zsock_new_sub(address.c_str(), "");

    int errorCode = zmq_errno();
    if (errorCode != 0)
    {
        printf(zmq_strerror(errorCode));
        b_RetFLAG = false;
    }
 // -------------------------------------------------------------
    zsock_destroy( &m_pSocket ); // USER's MANDATORY DUTY PRE-RET
 // -------------------------------------------------------------
    return b_RetFLAG;
}

If indeed in a trouble,

use the native API, where your code is fully controlling the stage.

Question:

I need to create a monitor, which will log information about packet missing using ZeroMQ ipc. Actually I don't really understand everything about it because of there are some LINX, TIPS protocols also. Can you please explain me that and answer the main question?


Answer:

You could make the application self-monitoring, by including a message serial number in each message structure. The message sender keeps track of the serial number it last sent, and increments it every time it sends a message.

The recipient should then be receiving messages with ever-increasing message serial numbers embedded. If that ever jumps by 2 or more, a message has gone missing.

IPC is not lossy like a network can be - the bytes put in come out the other end. TCP is not lossy either, provided both ends are still running and the network itself hasn't failed. However, depending on the ZMQ pattern used and how it's set up whole messages can be undelivered (for example, if the recipient hasn't connected yet, etc). If that's what you mean by "packet missing", it would be revealed by including an incrementing message serial number.

Question:

I've run into an issue I'd like to understand more.

I have a C++ application on a Linux machine. Let's call this program1. program1 uses ZeroMQ for IPC communications. I imagine that the ZeroMQ layer has a little to do with the behavior but want to introduce all the facts. If I run program1 via terminal with no elevated permissions, I can kill it any number of times. I can also run program1 with Eclipse with no issues.

However, if I run program1 with elevated permissions:

$ sudo ./program1

and kill it (Ctrl+C), the IPC socket locks up and is unavailable for use when trying to run the application without elevated privileges. However, if I rerun the program with elevated permissions, it works just fine.

My theory is that once you run the application with elevated permissions that the file descriptors change ownership to the parent process (with elevated permissions). Then, when you kill the process the file descriptors are never properly cleaned so their permissions are left elevated, unable to be used without the elevated permissions.

Is that on the mark? If so, is there a way to prevent this type of issue in code, or to fix the issue after it occurs without restarting the entire computer?

Updated ******

Updating to add more information:

1) The software does really exit when I hit Ctr+C. It can't be found in the system monitor, even when ran as sudo. (sudo gnome-system-monitor)

2) The socket returns "address already in use" on creation.

3) Ideally, I'd want them to be able to connect to each other no matter how they are ran. This isn't an issue I considered during development and will admit I'm new to Linux and IPC communication.


Answer:

You said it: "the IPC socket".

I guess that's not a TCP socket. If zeromq is creating a System V IPC object as root, the user cannot reuse it and that's why the permission error: IPC objects don't get destroyed by the process death and have user ownership and permissions.

You can list the existing IPC objects with the command ipcs, remove them with ipcrm.

Oh yes - take care not to delete IPC objects not related to your work...

If I failed my guess, you can use the command strace to inspect which system call is actually failing to find the real culprit.

Question:

I'm using ZeroMQ to implement a toy communications protocol; and this is my first time using this framework/library.

Now, in my protocol, multiple consecutive messages get sent by a certain party, all of which have the same size. So - I thought, I'd avoid reallocating them, and just try to refill the message data buffer with different content, e.g.:

zmq::message_t msg { fixed_common_size };
while (some_condition()) {
    my_filling_routine(msg.data(), fixed_common_size);
    the_socket.send(msg);
}

but on the second iteration of this loop, I get a segmentation fault; msg.data() is not nullptr though. It occurred to me, that maybe ZeroMQ cannibalizes the memory somehow, and thus I need to write something like:

zmq::message_t msg { fixed_common_size };
char buffer[fixed_common_size];
while (some_condition()) {
    my_filling_routine(buffer, fixed_common_size);
    msg.rebuild(buffer, fixed_common_size);
    the_socket.send(msg);
}

But I'm sure this causes de-allocation and re-allocation.

So, is it really the case that a rebuild() is necessary, or could it just be some bug in my code?

Note: I'm using Unix sockets, in case the answer depends on that somehow.


Answer:

No, you can't reuse a zmq::message_t's after it was sent.

First, welcome to ZeroMQ, a cool place to be in for distributed-system practitioners.

The ZeroMQ API warns about these issues quite explicitly. A successful call to zmq_msg_send(), referencing a message-payload, does not mean that the message itself has been actually sent.

It's best you try to imagine that the call just "moves" the responsiblity from your application code to the ZeroMQ engine (inside the factory with a pool-of-IOthreads instantiated inside the Context()-instance ... ).

The zmq_msg_t structure passed to zmq_msg_send() is nullified during the call.

and next better never touch it anytime later as reported above :o)

The maximum "ecological"-bypass of allocator is to re-use the whole-message, as :

If you want to send the same message to multiple sockets you have to copy it using ( e.g. using zmq_msg_copy() ).


A bonus warning on copying ...

plus one more raised point: may copy, but dare to try to modify it after this ...

Avoid modifying message content after a message has been copied with zmq_msg_copy(), doing so can result in undefined behaviour. If what you need is an actual hard copy, allocate a new message using zmq_msg_init_size() and copy the message content using memcpy(). Never access zmq_msg_t members directly, instead always use the zmq_msg-family of functions.

Question:

I'm using pyzmq library to establish a communication between two python processes on the same machine. Looking at the available messaging patterns offered by ZMQ it is not clear which one is suitable.In my case both processes need to send independently messages to one another at some random times. Basically at various moments each process is both a client and a server. Some visual description below.

To achieve this pattern, I am using two ZMQ.PAIR sockets. Each process .bind()-s a socket to receive on it and .connect()-s to the other to .send() whenever the need arises.Similarly each has a dedicated thread to handle the receiving, which is blocking.

However this design seems to me a bit like a stretch and I'm wondering if there is a setup more natural and suitable for this scenario? I'm also imagining that it must be quite common. In the current design I also do not like the fact that I need to manually take care of cleaning the two sockets when the processes finish.


Answer:

If the nodes are literally identical (at least, in terms of how they view each other for communication purposes, i.e. no real "reliable" server to the others "transient" client) then you have two options:

(A) Spin up two pairs of sockets, one pair that treats one node as a server and the other a client, and the other that reverses that relationship.

This might be appropriate if the communication follows a strict pattern between the nodes, e.g. one node initiates the communication and there is a specific back and forth that occurs until the end of that particular conversation. This allows each node to initiate communication independently and maintain their own communication cadence without confusing it with "crosstalk" initiated by the other node.

This is also more appropriate if there may some day be more than two nodes, to create a star topology where each node can communicate with every other node directly. There's a limit to how many nodes this can reasonably work well with, beyond which you'll want to implement some sort of central broker.

(B) Just pick one node to be the "server" and the other node to be the "client", bind() and connect() appropriately, and after that point just treat them as equals

This sounds most like what you're looking for. You don't need to be overly concerned with which side bind()s and which side connect()s. At the moment it sounds as if you're opening and closing the connection every time you need to communicate. You should consider just leaving the connection open for the life of your process. The PAIR sockets you're using support completely unstructured communication, so you should feel free to send and receive from either direction regardless of which side you designate as your "server" and which side you designate as your "client".

Question:

I want to connect to Unix Domain Socket created by ZeroMQ (IPC model) via command nc. I can connect, but when I sending some messages then, my deamon, which is listening to this socket, is not getting any message...

I'm using nc like:

nc -U /path/to/socket

Answer:

Very well, here's a longer version.

ZeroMQ implements a message queue transport system over the top of stream connections like sockets, named pipes, etc. To do this it runs a protocol called ZMTP over the top of the stream, which provides all the message demarcation, communication patterns, and so forth. It also has to deal with protocol errors in order to give itself some resiliency.

Comparison to a Web Browser

It's the same idea to a web browser and web server communicating using http over a socket. Http is used to transport html files. If you look at the data flowing over the socket you see the html mixed up with the messages involved in running the http protocol. And because http is a text based protocol, it looks kinda OK to the human eye.

Talking the Same Language

Thus when a program that uses the zmq libraries for communication connects a socket / named pipe / etc, it will be expecting to exchange data over that connection in the way defined by the ZMTP protocol (in the same way a web browser is expecting to talk to a server using http). If the program at the other end is also using zmq, then they're both talking the same protocol and everything is good.

Incompatible Protocols

However, if you connect a program that doesn't of itself use the ZMTP protocol such as a web browser, and that sends a http request, it's unlikely to mean anything. And the zmq library routines will no doubt receive the bytes that make up the http request, attempt to interpret it, fail to understand it, and ultimately reject it as garbage.

Similarly if the program that uses the zmq library wants to send messages, nothing will happen unless the underlying ZMTP protocol driver is content that it is communicating with something else that talks ZMTP. If anything at all emerges from netcap, it won't look anything like the message you were sending (it'll be jumbled up with the bytes that ZMTP uses).

Human Equivalent

The equivalent is an Englishman called Bob picking up the phone and dialling the number for his English friend called Alice living in Paris. However, if a Frenchman called Charlie answers the phone by mistake (wrong number), it'll be very difficult for them to exchange information. Meanwhile Eve, who's tapped the phone line, is laughing her head off at the ineptitude of these two people's failed attempt to communicate. (I make sweeping and partly justifiable generalisations about us Englishmen's poor ability to speak any other language).

Way Forward

There's a ZMQ binding available for almost everything, possibly even bash. Whatever it is you're trying to accomplish it's probably well worth while getting a decent binding of ZMQ for the programming or scripting language your using, and use that to provide a proper ZMQ endpoint.

Question:

I have a C application that get values from a sensor and a very high rate. I want to make a Python code to retrieve only the latest value from my C application only when the Python code requests.

I was thinking about using Shared Memory for this purpose. But I don't see any example in ZeroMQ's website.

I'm new to ZeroMQ. I don't know if that's even possible.


Answer:

Are you talking about essentially using ZMQ as the vehicle to store to and retrieve from a shared memory space between two separate processes? If so, then you're digging into details that aren't available to you in ZMQ. Generally speaking, in your situation you would use the "IPC" connection protocol, and ZMQ makes the decision of how to store, send and retrieve that message on your system.

I haven't looked into the low level details of how it might opt to do this in any given scenario, I don't expect it to use shared memory, but it might. Either way, I'm not aware of any way to force ZMQ to behave this way.

Question:

I'm trying to create a REQ <--> Router <--> Dealer <--> REP communication in C++. The child process binds the router and dealer, proxies between router and dealer, connects the REP to the dealer and waits for a message with zmq_recv.

The parent process connects a REQ to the router and tries to send a message, however I'm getting a zmq_send error in parent: Resource temporarily unavailable (which is EAGAIN). According to zmq_send docs, EAGAIN means:

Non-blocking mode was requested and the message cannot be sent at the moment.

However the message does get sent since it is received in the child process. Why does it return that errno?

Here is the MCVE:

#include <zmq.h>
#include <iostream>
#include <sys/types.h>
#include <unistd.h>
#include <assert.h>
#include <thread>
#include <stdio.h>

int main() {
  char connect_path[35];
  int rc;
  int msg;
  pid_t child_pid = fork();

  if (child_pid == 0) {
    // Child
    void* child_context = zmq_ctx_new ();
    if (child_context == NULL) {
      std::cerr << "\nChild context error\n";
    }

    void* router = zmq_socket(child_context, ZMQ_ROUTER);
    if (router == NULL) {
      perror("zmq_socket of type router error");
    }
    char bind_path[35];

    snprintf(bind_path, sizeof(bind_path), "ipc:///tmp/zmqtest%d-router", getpid());
    rc = zmq_bind(router, bind_path);
    assert (rc == 0);

    void* dealer = zmq_socket(child_context, ZMQ_DEALER);
    if (dealer == NULL) {
      perror("zmq_socket of type dealer error");
    }

    snprintf(bind_path, sizeof(bind_path), "ipc:///tmp/zmqtest%d-dealer", getpid());
    rc = zmq_bind(dealer, bind_path);
    assert (rc == 0);

    std::thread z_proxy (zmq_proxy, router, dealer, nullptr);
    z_proxy.detach();

    void* rep_socket = zmq_socket (child_context, ZMQ_REP);
    if (rep_socket == NULL) {
      perror("zmq_socket of type rep error");
    }

    snprintf(connect_path, sizeof(connect_path), "ipc:///tmp/zmqtest%d-dealer", getpid());
    rc = zmq_connect(rep_socket, connect_path);
    assert (rc == 0);

    while(1) {
      if (zmq_recv (rep_socket, &msg, sizeof(msg), 0) != 0) {
        perror("zmq_recv error");
      }
      printf("\nReceived msg %d in process %d\n", msg, getpid());
      break;
    }
    if (zmq_close(rep_socket) != 0) {
      perror("zmq_close of rep_socket in child error");
    }
    if (zmq_ctx_term(child_context) != 0) {
      perror("zmq_ctx_term of child_context error");
    }
  } else {
    // Parent
    sleep(1);

    void* parent_context = zmq_ctx_new ();
    if (parent_context == NULL) {
      std::cerr << "\nParent ctx error\n";
    }

    void* req_socket = zmq_socket (parent_context, ZMQ_REQ);
    if (req_socket == NULL) {
      perror("zmq_socket of type req error in parent");
    }

    snprintf(connect_path, sizeof(connect_path), "ipc:///tmp/zmqtest%d-router", child_pid);
    rc = zmq_connect(req_socket, connect_path);
    assert (rc == 0);

    msg = 30;
    if (zmq_send (req_socket, &msg, sizeof(msg), 0) != 0) {
      perror("zmq_send error in parent");
    }

    if (zmq_close(req_socket) != 0) {
      perror("zmq_close of req_socket in parent error");
    }
    if (zmq_ctx_term(parent_context) != 0) {
      perror("zmq_ctx_term of parent_context error");
    }
  }
}

Answer:

Step 1: Make a trivial test:

Well, as a minimum point, there ought be this sort of test-en-Queueing first:

 rc =           zmq_send ( req_socket, "A_TEST_BLOCK", 12, ZMQ_DONTWAIT );
 printf ( "INF: zmq_send ( req_socket, "A_TEST_BLOCK", 12, ZMQ_DONTWAIT )\nZMQ: returned rc == %d\nZMQ: zmq_errno ~ %s\n",
           rc,
           zmq_strerror ( zmq_errno() )
           );

.


Step 2: post the printed outputs

Next, if there are any "missed" shots, the error-analysis may advise on potential reason(s) ( if and only if the parent_ctx indeed rejected to even accept the data from a simplest ever zmq_send() call into it's internal queueing facility with an explicit reason for having done so ).

Otherwise we know nothing ( and the ZMQ_DONTWAIT flag is not the reason here ).

As the test was run, it yielded:

INF: zmq_send ( req_socket, 'A_TEST_BLOCK', 12, ZMQ_DONTWAIT ) ZMQ: returned rc == 12 ZMQ: zmq_errno ~ Resource temporarily unavailable


Step 3:

The test has confirmed, as per documentation:

The zmq_send() function shall return number of bytes in the message if successful.

So, let's dig a step deeper:

int major, minor, patch;

zmq_version ( &major, &minor, &patch );
printf ( "INF: current ØMQ version is %d.%d.%d\nZMQ: zmq_errno ~ %s\n",
          major, minor, patch,
          zmq_strerror ( zmq_errno() )
          );

Step 4:

In case the bleeding-edge API-updates do not conform to the published API-specification, document the incident:

printf ( "EXPECT( NO ERROR, ON START ): zmq_errno ~ %s\n",
          zmq_strerror ( zmq_errno() )
          );

printf ( "EXPECT( <major>.<minor>.<patch> ): zmq_version ~\n" );

int major, minor, patch
zmq_version ( &major, &minor, &patch );

printf ( "INF: current ØMQ version is %d.%d.%d\nZMQ: zmq_errno ~ %s\n",
          major, minor, patch
          )

printf ( "EXPECT( NO ERROR ): zmq_errno ~ %s\n",
          zmq_strerror ( zmq_errno() )
          );

printf ( "EXPECT( NO ERROR ): zmq_send() ~ %s\n" );

rc =           zmq_send ( req_socket, "A_TEST_BLOCK", 12, ZMQ_DONTWAIT );
printf ( "INF: zmq_send ( req_socket, "A_TEST_BLOCK", 12, ZMQ_DONTWAIT )\nZMQ: returned rc == %d which ouhgt be == 12, is it?\n",
       rc
       );

printf ( "EXPECT( NO ERROR ): zmq_errno ~ %s\n",
          zmq_strerror ( zmq_errno() )
          );

and feel free to file an issue, if unexpected results appear.

Question:

I want to establish publish subscribe communication between to machines.

The two machines, that I have, are ryu-primary and ryu-secondary

The steps I follow in each of the machines are as follows.

In the initializer for ryu-primary (IP address is 192.168.241.131)

 self.context    = zmq.Context()
 self.sub_socket = self.context.socket(zmq.SUB)
 self.pub_socket = self.context.socket(zmq.PUB)
 self.pub_port   = 5566
 self.sub_port   = 5566


def establish_zmq_connection(self):                      # Socket to talk to server
    print( "Connection to ryu-secondary..." )
    self.sub_socket.connect( "tcp://192.168.241.132:%s" % self.sub_port )

def listen_zmq_connection(self):
    print( 'Listen to zmq connection' )
    self.pub_socket.bind( "tcp://*:%s" % self.pub_port )

def recieve_messages(self):
    while True:
        try:
            string = self.sub_socket.recv( flags=zmq.NOBLOCK )
            print( 'flow mod messages recieved {}'.format(string) )
            return string
        except zmq.ZMQError:
            break

def push_messages(self,msg):
    self.pub_socket.send( "%s" % (msg) )

From ryu-secondary (IP address - 192.168.241.132)

In the initializer

    self.context    = zmq.Context()
    self.sub_socket = self.context.socket(zmq.SUB)
    self.pub_socket = self.context.socket(zmq.PUB)
    self.pub_port   = 5566
    self.sub_port   = 5566


def establish_zmq_connection(self):                     # Socket to talk to server
     print( "Connection to ryu-secondary..." )
     self.sub_socket.connect( "tcp://192.168.241.131:%s" % self.sub_port )

def listen_zmq_connection(self):
     print( 'Listen to zmq connection' )
     self.pub_socket.bind( "tcp://*:%s" % self.pub_port )

def recieve_messages(self):
    while True:
        try:
            string = self.sub_socket.recv( flags=zmq.NOBLOCK )
            print( 'flow mod messages recieved {}'.format(string) )
            return string
        except zmq.ZMQError:
            break

def push_messages(self,msg):
    print( 'pushing message to publish socket' )
    self.pub_socket.send( "%s" % (msg) )

These are the functions that I have.

I am calling on ryu-secondary:

establish_zmq_connections()
push_messages() 

On ryu-primary, when I call

listen_zmq_connection()
recieve_messages() 

after subscribing to all types of messages using .setsockopt( zmq.SUBSCRIBE = '')

However the message I am trying to send is of the following type.

msg = {'in_port':in_port,'dst':dst,'actions':actions}
self.push_messages(msg)

However on the other side (recieve_messages() I get the following error when I do this

flow_mod = recieve_messages() 

flow_mod['in_port']
flow_mod['dst']
flow_mod['actions']


TypeError: string indices must be integers, not str

Answer:

msg is a Python dict, but you are sending (and receiving) messages formatted as strings. Probably the easiest thing to do is to serialize msg to JSON format, send it as a string, then load the received string back into a dict again. Then, and only then, will you be able to access the keys and values properly. Something like this should work (make sure you import json somewhere above):

# on the sending end
msg = {'in_port':in_port,'dst':dst,'actions':actions}
msg_string = json.dumps(msg)
self.push_messages(msg)

# on the receiving end
payload = receive_messages()
message = json.loads(payload)

You can find the full docs for the json module here (for Python 2) and here for Python 3.

Question:

I want to establish publish subscribe communication between to machines.

The two machines, that I have, are ryu-primary and ryu-secondary

The steps I follow in each of the machines are as follows.

In the initializer for ryu-primary (IP address is 192.168.241.131)

 self.context    = zmq.Context()
 self.sub_socket = self.context.socket(zmq.SUB)
 self.pub_socket = self.context.socket(zmq.PUB)
 self.pub_port   = 5566
 self.sub_port   = 5566


def establish_zmq_connection(self):                      # Socket to talk to server
    print( "Connection to ryu-secondary..." )
    self.sub_socket.connect( "tcp://192.168.241.132:%s" % self.sub_port )

def listen_zmq_connection(self):
    print( 'Listen to zmq connection' )
    self.pub_socket.bind( "tcp://*:%s" % self.pub_port )

def recieve_messages(self):
    while True:
        try:
            string = self.sub_socket.recv( flags=zmq.NOBLOCK )
            print( 'flow mod messages recieved {}'.format(string) )
            return string
        except zmq.ZMQError:
            break

def push_messages(self,msg):
    self.pub_socket.send( "%s" % (msg) )

From ryu-secondary (IP address - 192.168.241.132)

In the initializer

    self.context    = zmq.Context()
    self.sub_socket = self.context.socket(zmq.SUB)
    self.pub_socket = self.context.socket(zmq.PUB)
    self.pub_port   = 5566
    self.sub_port   = 5566


def establish_zmq_connection(self):                     # Socket to talk to server
     print( "Connection to ryu-secondary..." )
     self.sub_socket.connect( "tcp://192.168.241.131:%s" % self.sub_port )

def listen_zmq_connection(self):
     print( 'Listen to zmq connection' )
     self.pub_socket.bind( "tcp://*:%s" % self.pub_port )

def recieve_messages(self):
    while True:
        try:
            string = self.sub_socket.recv( flags=zmq.NOBLOCK )
            print( 'flow mod messages recieved {}'.format(string) )
            return string
        except zmq.ZMQError:
            break

def push_messages(self,msg):
    print( 'pushing message to publish socket' )
    self.pub_socket.send( "%s" % (msg) )

These are the functions that I have.

I am calling on ryu-secondary:

establish_zmq_connections()
push_messages() 

But I am not recieving those messages on ryu-primary, when I call

listen_zmq_connection()
recieve_messages() 

Can someone point out to me what I am doing wrong?


Answer:

Repair the PUB/SUB messaging pattern setup

There are several important steps in making the PUB/SUB pattern work.

All this is well described in the ZeroMQ documentation.

You need not repeat both pub & sub parts of code on both sides, the more that it masks, as A side-effect thereof, the case if you mix the pub and sub socket addresses/ports/calls/etc in an "opposite" node code and you do not see such a principal collision.

  1. your code defines the initial form of PUB-archetype, that is expected to .push_messages()

  2. your code defines the initial form of SUB-archetype, that is expected to .receive_messages()

  3. your code does not show, how do you control who goes first on a connection setup -- whether .bind() or .connect() appears at random or before/after the other

  4. your code does not show any subscription setup, after the SUB-archetype was instantiated. A default value upon a socket instantiation does need to be modified via a .setsockopt( zmq.SUBSCRIBE = '') method, otherwise there is a prohibitive filter that does not allow any ( yet unsubscribed ) message to pass through and got-output ( "received" ) on the SUB-side


Must modify a default SUB-side subscription filter, it is prohibitive

You may have noticed from the ZeroMQ documentation, that until setup otherwise, the sub-side does filter-out all incoming messages.

http://api.zeromq.org/2-1:zmq-setsockopt

"The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.

An empty option_value of length zero shall subscribe to all incoming messages. A non-empty option_value shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter."


Class-method pre-configuration of a Context instance possible

There is another possibility for a python code using pyzmq 13.0+. There you may also setup this via a Context class-method .setsockopt( zmq.SUBSCRIBE, "" ) et al, but such call has to precede the new socket instantiation from a Context-instance pre-configured this way.

Question:

Im new to zmq. Im using the same for local IPC in a Linux based OS (The socket is AF_UNIX type) But I could not find a way to get the caller's (client) process Id. Is there any way to find the same using zmq ? (Finding the pid of the caller is must for my access control requirement and if zmq does not provide the same then I should switch to dbus) Please help me.


Answer:

Forget most of the low-level socket designs and worries. Think higher in the sky. ZeroMQ is a pretty higher-level messaging concept. So you will have zero-worries about most of the socket-io problems.

For more on these ZMQ principles, read Pieter Hintjens' design maxims and his resources-rich book "Code Connected, Vol.1".

That said, the solution is fully in your control.

Solution

Create a problem-specific multi-zmq-socket / multi-zmq-pattern (multiple zmq-primitives used and orchestrated by your application level logic) as a problem-specific formal communication handshaking.

Ensure the <sender> adds it's own PID into message.

Re/authorise via another register/auth-socket-pattern with the pre-registered sender from the receiver side, so as to avoid a spoofed attack under a fake/stolen PID-identity.

Adapt your access-control policy according to your ProblemDOMAIN, use and implement any level of crypto-security formal handshaking protocols for identity-validation or key-exchange, to raise your access-control policy security to adequate strengths ( including MIL-STD grades ).

Question:

I am following this article: "http://hintjens.com/blog:49". The test code of "stonehouse" uses "PUSH/PULL" and I try to modify it to use "PUB/SUB". But my initial naive attempt fails (code below, which works for PUSH/PULL). Is there anything I miss? BTW, does czmq really work with PUB/SUB, because even I remove the lines related to security, it still doesn't work. Thank you for the help.

#include <czmq.h>

int main (int argc, char **argv)
{
//  Create context and start authentication engine
zctx_t *ctx = zctx_new ();
zauth_t *auth = zauth_new (ctx);
zauth_set_verbose (auth, true);
zauth_allow (auth, "127.0.0.1");

zauth_configure_curve (auth, "*", CURVE_ALLOW_ANY);

if (argc == 1)
{   zcert_t *server_cert = zcert_new ();
    char *server_key = zcert_public_txt (server_cert);
    printf ("%d: BEGIN '%s' END\n", strlen (server_key), server_key);

    void *server = zsocket_new (ctx, ZMQ_PUB/*PUSH*/);
    zcert_apply (server_cert, server);
    zsocket_set_curve_server (server, 1);
    zsocket_bind (server, "tcp://*:9000");
    printf ("Hit any key to start sending...\n");
    getchar ();
    int i=5;
    while (i > 0)
    {   printf ("%d ", i); fflush (stdout);
        sleep (1);
        --i;
    }
    zstr_send (server, "Hello");
    printf ("sent\n");
    zcert_destroy (&server_cert);
}
else
{   zcert_t *client_cert = zcert_new ();

    void *client = zsocket_new (ctx, ZMQ_SUB/*PULL*/);
    zcert_apply (client_cert, client);
    zsocket_set_curve_serverkey (client, argv[1]);
    zsocket_connect (client, "tcp://127.0.0.1:9000");

    char *message = zstr_recv (client);
    printf ("received: %s\n", message);
    assert (streq (message, "Hello"));
    free (message);
    puts ("Stonehouse test OK");

    zcert_destroy (&client_cert);
}

zauth_destroy (&auth);
zctx_destroy (&ctx);
return 0;
}

Answer:

I finally figure out: the subscriber needs to set the option of "ZMQ_SUBSCRIBE". Otherwise, it receives nothing. I previously assume the opposite, which is wrong.