Hot questions for Using ZeroMQ in port

Question:

I have a container that it has several ports, I want to have access to one of its ports (9001) outside of this docker as remote.

  • My docker IP is: 172.17.0.1
  • My container IP is: 172.19.0.23
  • My server IP is: 192.168.1.131

I have searched about that and I found expose port keyword, and I did it but not worked.

How to expose docker ports to make your containers externally accessible Reference


This is my docker-compose file:
version: '3'

services:
  nginx:
      image: nginx:latest
      container_name: nginx
      ports:
        - "8010:8010"

      volumes:
        - .:/code
        - ./nginx/default.conf:/etc/nginx/conf.d/default.conf

      links:
        - ivms

      restart: unless-stopped

  ivms:
      build: .
      container_name: ivms
      command: bash bashes/createDB.sh
      volumes:
        - .:/code
      expose:
        - "8010"
        - "9001"  # exposed disired port
      ports:
        - "9001:9001"

I run above docker-compose file with: $ docker-compose up -d

  • But when I using server_IP:9001 --> 192.168.1.131:9001 or docker_IP:9001 --> 172.17.0.1:9001 can not access to that (in remote or local mode)
  • But when using container_IP:9001 --> 172.19.0.23:9001 this works in local.

What should I do that I can access to server_IP:9001 --> 192.168.1.131:9001?


[NOTE]:

  • In createDB.sh runs several operations such as creating a ZMQ on 9001 port.

  • I have been set the port allowing before, using $ ufw allow 9001

  • I tried on Ubuntu 16.04 and Ubuntu-Server 16.04

Any help would be appreciated.


Answer:

If you want to actually map the port you should use

ivms:
  build: .
  container_name: ivms
  command: bash bashes/createDB.sh
  volumes:
    - .:/code
  ports:
    - "8010:8010"
    - "9001:9001"  # now you can access them locally

warning you are using the same port for these two services ivms and nginx

The EXPOSE instruction informs Docker that the container listens on the specified network ports at runtime. You can specify whether the port listens on TCP or UDP, and the default is TCP if the protocol is not specified.

The EXPOSE instruction does not actually publish the port. It functions as a type of documentation between the person who builds the image and the person who runs the container, about which ports are intended to be published. -Docker Docs

Question:

The documentation on saltstack appears to be unclear regarding what ports are required from the salt-master -> salt-minion (apparently none are required). It suggests that ports only need to be opened from the salt-minion -> salt-master. (See: http://docs.saltstack.com/en/latest/topics/tutorials/firewall.html)

If however commands are executed remotely on the salt-master targeted to a minion, surely the master needs to be able to push this into the minion and therefore require a network opening to allow for this.

Therefore my question is if the saltstack ports (4505 & 4506) need to be opened in both directions, or whether the remote commands are triggered over another protocol?

[A bit of background: My team want salt-stack setup to manage a server landscape in quite a restrictive network where each individual network route needs to be requested in the security concept. This is not controlled by our company and I need to explicitly request all required routes and in each direction.]


Answer:

Salt uses a zeromq pub/sub interface to communicate with the minions. Indeed, you only need to open ports 4505 and 4506 on the master's firewall.

The minions listen on one port on the master, which is the "pub" port, and then return results to the master on the other port.

The master never actually "pushes" commands to the minions. The minions listen for commands published on the pub port. Which is why you don't need to open any incoming ports on your minions.

Question:

This question has been asked before, here. I have the exact same problem. I want to publish from a bunch of different processes, and use the same port every time.

I tried the solution presented in the answer, but this did not work for me. I get the error

    File "/usr/local/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/akay/afk/multi.py", line 18, in to_zmq
    socket.connect("tcp://*:%s" % port)
  File "zmq/backend/cython/socket.pyx", line 478, in zmq.backend.cython.socket.Socket.connect (zmq/backend/cython/socket.c:4308)
ZMQError: Invalid argument

My code is like this, essentially taken straight from the example in the zmq docs here and here:

# Socket to talk to server
port = '5556'
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Listening for stream...", m
socket.bind("tcp://localhost:%s" % port) #change connect to bind, as per answer above
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

I am using python 2.7, and the most recent version of zmq. Any idea what I might be doing wrong?


Answer:

Well, the error is clear:

    [...]
    socket.connect("tcp://*:%s" % port)
    [...]
ZMQError: Invalid argument

You can't connect to *, you must specify an IP address (the server IP address). If both the client and the server run on a single machine, try with localhost or 127.0.0.1.

Question:

When I create the Hello World example in C++ from The Guide on ZeroMQ found here: http://zguide.zeromq.org/page:all#Ask-and-Ye-Shall-Receive

and run the application, I get a Windows Security Alert that asks if I would like to allow the application to communicate on public or private networks.

It looks like this:

Here is where things get interesting.

I only need my program to listen on port 5555 for connections from localhost and I do NOT need to allow incoming connections on port 5555. This is because I only want to communicate between applications on the localhost.

Client and server are both running on the same machine.

Here is my current process. I start the server, the Windows Security Alert comes up, since I am running the application as a non-administrator account, I only have standard permissions. Then I click Cancel on the Alert.

Clicking cancel on the alert puts an explicit deny inbound rule on all ports for HelloWorldServer.exe. This is totally fine.

Then I start the client. Since the client is connecting to the localhost. I actually does not need to send messages outside of the local machine, and all of its messages arrive at the server just fine.

Given an explicit deny rule on incoming connections to HelloWorldServer.exe, the messages can still arrive from the client on the local host. This is a desirable result.

Now the question becomes is there anyway to automatically respond to the Windows Security Alert to click cancel? Is there any way to suppress it from popping up since the allow is not needed?

The prompt is undesirable because it implies that the application needs to create a vulnerability when it does not.

Please assume that Named Pipes are not a valid alternative to tcp as a means of inter-process communication.


Answer:

When binding the socket the caller may specify the IP address the socket is bound to. The coding samples provided by ZeroMQ specify

socket.bind ("tcp://*:5555"); 

where * appears to be specify all possible addresses (INADDR_ANY in BSD socket-derived parlance) which will trigger the Windows firewall as it allows remote and local addresses.

Calling socket.bind with the localhost address 127.0.0.1

socket.bind ("tcp://127.0.0.1:5555"); 

limits the sockets allowed to connect to the local machine and should silence the firewall warning for most Windows firewall configurations.

Question:

I am making an application in C++ where two publishers, say pub1 and pub2, are supposed to send data to 3 subscribers, say s1, s2, s3.

The pattern is as following:

Pub1 sends data to s1 and s2

Pub2 sends data to s2 and s3

Now, if I am using the different TCP ports for the publishers, it is working, but I need to do it using a single port, when using the same port for both the publishers then only the publisher that is binding first works and the other publisher throws memory error.

Any ideas?


Answer:

Use a zmq::proxy internally to allow multiple publishing sockets to serve data over one TCP port.

P1(subject P1) -inproc-> |                       |--tcp-> SUB(P1)
                         |--XSUB(zmq::proxy)XPUB-|--tcp-> SUB(P1,P2)
P2(subject P2) -inproc-> |                       |--tcp-> SUB(P2)

Method:

Publisher Application

  • Two PUB sockets (inside the publisher app), one publishes with subject "P1", the other "P2"
  • They both connect inside your application to the XSUB side of your zmq::proxy over //:inproc. The proxy is also running inside your publisher application
  • Other side of the proxy (XPUB side) binds to the single TCP port.

Subscriber Applications

  • Your subscriber applications connect to the TCP port and subscribe to what they require.
    • For instance S1 will subscribe to "P1" while S2 will subscribe to both "P1" and "P2"
    • e.g subscribe_sock.setsockopt(ZMQ_SUBSCRIBE, "P1", 2)
    • e.g subscribe_sock.setsockopt(ZMQ_SUBSCRIBE, "P2", 2)

Question:

I have two different node.js processes (different publisher socket instances) need to publish messages using different topics but on same address and port using zmq. Is it right way to do that or whether it is always good to use different address and port for different publisher socket instances? Please note that I use different topics for different publishers.


Answer:

Unfortunately, no.

Calling Socket.Bind() from two different process on the same address and port is going to result in a ZeroMQ "Address in Use" exception.

Luckily!

A subscriber can connect to more than one publisher, using one connect call each time. Data will then arrive and be interleaved ("fair-queued") so that no single publisher drowns out the others.

Question:

I am trying to run the program given below(that I copied from zeromq tutorial).I am getting the error

Python ImportError: No module named zhelpers

I have tried installing

sudo apt-get install zhelpers
sudo pip install zhelpers

is giving the following errors.

E: Unable to locate package zhelpers
OR
No distributions at all found for zhelpers

Can someone point out what is the exact name of the module that I should install.

How can I fix this error.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This shows how to capture data using a pub-sub proxy
#
import time
from random import randint
from string import uppercase
from threading import Thread
import zmq
from zmq.devices import monitored_queue
from zhelpers import zpipe

# The subscriber thread requests messages starting with
# A and B, then reads and counts incoming messages.
def subscriber_thread():
    ctx = zmq.Context.instance()
    # Subscribe to "A" and "B"
    subscriber = ctx.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:6001")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B")
    count = 0
    while True:
        try:
            msg = subscriber.recv_multipart()
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break # Interrupted
    else:
        raise
        count += 1
        print ("Subscriber received %d messages" % count)
# .split publisher thread

# The publisher sends random messages starting with A-J:
def publisher_thread():
    ctx = zmq.Context.instance()
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:6000")

    while True:
        string = "%s-%05d" % (uppercase[randint(0,10)], randint(0,100000))
        try:
            publisher.send(string)
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break # Interrupted
    else:
        raise
        time.sleep(0.1) # Wait for 1/10th second
# .split listener thread
# The listener receives all messages flowing through the proxy, on its
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects
# attached child threads via inproc. In other languages your mileage may vary:

def listener_thread (pipe):
    # Print everything that arrives on pipe
    while True:
        try:
            print (pipe.recv_multipart())
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break # Interrupted

# .split main thread
# The main task starts the subscriber and publisher, and then sets
# itself up as a listening proxy. The listener runs as a child thread:
def main ():
    # Start child threads
    ctx = zmq.Context.instance()
    p_thread = Thread(target=publisher_thread)
    s_thread = Thread(target=subscriber_thread)
    p_thread.start()
    s_thread.start()
    pipe = zpipe(ctx)

    subscriber = ctx.socket(zmq.XSUB)
    subscriber.connect("tcp://localhost:6000")
    publisher = ctx.socket(zmq.XPUB)
    publisher.bind("tcp://*:6001")
    l_thread = Thread(target=listener_thread, args=(pipe[1],))
    l_thread.start()
    try:
        monitored_queue(subscriber, publisher, pipe[0], 'pub', 'sub')
    except KeyboardInterrupt:
        print ("Interrupted")

    del subscriber, publisher, pipe
    ctx.term()
    if __name__ == '__main__':
        main()

Answer:

You can download all the examples of the zeromq guide here. There you have also zhelpers.py.

Question:

I have a serial device that is being converted to TCP/IP through a serial server. This seems to work correctly as other applications can connect this way.

In my application, I'm trying to use ZeroMQ ZMQ_STREAM to send a request for information to the device, and wait for a reply.

// from it's own thread: threadTx
zmq::socket_t soc_tx(zmq_ctx, ZMQ_STREAM);
soc_tx.connect("tcp://10.10.10.100:4003");

std::string msg("hello");
zmq::message_t zmsg(msg.c_str(), msg.size());
soc_tx.send(zmsg);

then from another thread:

// from another thread: threadRx
zmq::socket_t soc_rx(zmq_ctx, ZMQ_STREAM);
soc_rx.connect("tcp://10.10.10.100:4003");

zmq::message_t zrecv;
soc_rx.recv(&zrecv);

The send comes back as successful and I do receive a 0 length message which per documentation indicates a successful connection (or disconnect).

However, I never receive the reply to the original sent request. soc_rx.recv() will wait infinitely. Wireshark indicates a reply is being sent, it's just never received by ZeroMQ host.

I've also tried to receive on the same socket I opened for sending with no luck.

// from it's own thread: threadTx
zmq::socket_t soc(zmq_ctx, ZMQ_STREAM);
soc.connect("tcp://10.10.10.100:4003");

std::string msg("hello");
zmq::message_t zmsg(msg.c_str(), msg.size());
soc.send(zmsg);

while (true)
{
  zmq:message_t zrecv;
  soc.recv(&zrecv);
}

But still I only get the 0 length connection message and nothing else afterwards.

I thought this may be a configuration issue, but again, other applications work. I've also been able to successfully connect and both send and recv messages from the device using native Linux sockets.

uint16_t const PORT = 4003;
struct sockaddr_in address;
int sock = 0, valread;
struct sockaddr_in serv_addr;
char buffer[1024] = {0};
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
  return -1;
}

memset(&serv_addr, '0', sizeof(serv_addr));

serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);

// Convert IPv4 and IPv6 address from text to binary form
if(inet_pton(AF_INET, "10.10.10.100", &serv_addr.sin_addr) <= 0)
{
  std::cout << "> invalid address. address not supported" << std::endl;
  return nullptr;
}

if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
{
  std::cout << "> connection failed" << std::endl;
}

send(sock, out_buf, out_buf_size, 0);

std::this_thread::sleep_for(std::chrono::milliseconds(1000));

valread = recv(sock, buffer, 1024, 0);

And this works just fine.

For consistency with other applications in my system I'd love to use ZeroMQ for all socket communications, but is there a restriction preventing this?


Answer:

Using ZMQ_STREAM is a little tricky, but you should be able to use a single socket to communicate with an external TCP socket.

One disclaimer, I'll show examples using zmqpp library, but key point is not exact functions to call but rather how to correctly apply flow, so I hope you will be able to transform it to pure ZeroMQ.

Take a look at zmq api docs, Native pattern paragraph.

To initiate the connection with remote, after connect you need to obtain ZMQ_IDENTITY.

To open a connection to a server, use the zmq_connect call, and then fetch the socket identity using the ZMQ_IDENTITY zmq_getsockopt call.

zmqpp::socket tcp{context, zmqpp::socket_type::stream};
tcp.connect("tcp://127.0.0.1:12345");
std::string identity = tcp.get<std::string>(zmqpp::socket_option::identity);

What is identity? Binary data. From docs:

Identity should be at least one byte and at most 255 bytes long. Identities starting with binary zero are reserved for use by ØMQ infrastructure.

After obtaining identity, you can receive on the socket to obtain zero-length message - it will indicate that connection has been established.

When a connection is made, a zero-length message will be received by the application.

What I discovered, is when ZMQ_STREAM api docs mention zero-length message, it actually means message with 2 frames, first identity, second zero-length.

Now, sending. What is missing in your code is to pass identity as first frame of the message. ZeroMQ engine will remove that and send only second frame (you can't add more frames to message).

You must send one identity frame followed by one data frame. The ZMQ_SNDMORE flag is required for identity frames but is ignored on data frames.

zmqpp::message msg;
msg << identity;
msg << "Hello world";
tcp.send(msg);

Now you can wait for the response from remote, you'll either receive proper reply (message composed of 2 frames, first identity, second, what remote answered) or disconnection indication (zero-length message).

When receiving TCP data, a ZMQ_STREAM socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application.

Similarly, when the peer disconnects (or the connection is lost), a zero-length message will be received by the application.

zmqpp::message msg;
tcp.receive(msg);

std::string r_ident;
msg >> r_ident;
std::string body;
msg >> body;

Question:

Is ZeroMQ or RabbitMQ asynchronous, even when it is utilized in C programming?


Answer:

Yes, both ZeroMQ and RabbitMQ support asynchronous modes of operation, and this applies to the C client library implementations for both (as of this writing).

Question:

I have been looking into zmq for a while now and have implemented a simplified poc - to mimic my infrastructure design - using it (specifically using the NetMQ wrapper), with great results.

My situation is this:

In the future I am planning to run multiple clients on a single machine, where each client needs to communicate with a "server" (located on a different machine) via multiple sockets.

I've noticed that for each socket that I declare and open explicity many more are opened internally by zmq and are managed by it.

Edit

binding sockets get a new port allocated in the dynamic range and that is fine,

but although my client connects explicitly to only 2 ports, some 15 ports are allocated for him automatically by zmq.

this, I fear, might eventually lead to a shortage of ports, a situation I very much want to avoid.

The questions:

  1. How does zmq allocates ports intenally and what is the ratio between explicitly declared sockets and the ports zmq openes automatically?

  2. Is there a way for me to control the port allocation programatically, via configuration or by any other means?

  3. How does using a poll affects the ports usage?

Tnx,


Answer:

When you create a socket on zeromq/netmq on windows a dedicated socket is used to signal between io thread and user thread, this socket take two ports. if you call bind you bind another port with your selected port.

The dedicated socket is using the dynamic port range (netmq), so if you stay away from that range you will not have any problem.

Dynamic port range for windows vista and above is 49152 until 65535

port counting code:

static void Main(string[] args)
{
    var id = Process.GetCurrentProcess().Id;

    using (var context = NetMQContext.Create())
    {
        List<NetMQSocket> sockets = new List<NetMQSocket>();

        NetMQSocket server = context.CreateDealerSocket();
        server.Bind("tcp://localhost:6666");

        int i= 0;
        while (true)
        {
            var client = context.CreateDealerSocket();
            client.Connect("tcp://localhost:6666");

            sockets.Add(client);

            Thread.Sleep(1000);                                       

            ProcessStartInfo startInfo = new ProcessStartInfo("NETSTAT.EXE", "-a -o");
            startInfo.RedirectStandardOutput = true;
            startInfo.UseShellExecute = false;
            startInfo.CreateNoWindow = true;

            Console.WriteLine("Calculating taken ports...");

            Process process  = Process.Start(startInfo);                    
            int portCounter = -7; // start with minus 4 for framework and server socket

            while (!process.StandardOutput.EndOfStream)
            {
                if (process.StandardOutput.ReadLine().Contains(id.ToString()))
                {
                    portCounter ++;
                }
            }

            Console.Clear();
            Console.WriteLine("{0} sockets takes {1} ports, avg of {2} ports per socket", sockets.Count, portCounter, portCounter / sockets.Count);
            Console.WriteLine("Press enter to create another socket");

            Console.ReadLine();
        }
    }
}

Question:

I am attempting to import the ZeroMQ library for a project in Unity. I am using C# and Visual Studio for editing. I used NuGet to import ZeroMQ into Visual Studio, but when I try to run the game I get the error

Severity    Code    Description Project File    Line    Suppression State
Error   CS0246  The type or namespace name 'ZeroMQ' could not be found (are you missing a using directive or an assembly reference?)    Assembly-CSharp C:\Users\<me>\OneDrive\Documents\UrBalls\Assets\Scripts\PlayerController.cs 4   Active

The controller file is from a tutorial:

using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using ZeroMQ;

public class PlayerController : MonoBehaviour
{
    public float speed;
    private Rigidbody rb;

    // Start is called before the first frame update
    void Start()
    {
        rb = GetComponent<Rigidbody>();     
    }


    void FixedUpdate()
    {
        float moveHorizontal = Input.GetAxis("Horizontal");
        float moveVertical = Input.GetAxis("Vertical");

        Vector3 movement = new Vector3(moveHorizontal, 0.0f, moveVertical);
        System.Console.WriteLine("Hey");
        rb.AddForce(movement * speed);
    }

    // Update is called once per frame
    void Update()
    {        
    }
}

How do I get the compiler to see the package? Appreciated!


Answer:

I did not test if this actually works, but I was able to get past the error you have here...

1) If you installed the top result from NuGet (the metadings package), uninstall it and try using the 'clzmq' package (or 64-bit version).

2) Copy the DLLs from: 'C:\Users\.nuget\packages\clrzmq\2.2.5\lib' & 'C:\Users\.nuget\packages\clrzmq\2.2.5\content' to the 'Assets' folder of your Unity project.

3) In your script, switch 'using ZeroMQ;' to 'using ZMQ;'

You should be able to run at this point.

I did not try copying the DLLs from the first 'ZeroMQ' package by metadings, so you can try that if you like. I would try clzmq first, if it doesn't suit your needs then give metadings another shot. You will have to also configure Unity with 4.0 framework support to use the metadings package.

UPDATE: I ended up testing this. I had to use the x64 version, but it works. Here's the complete script:

using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Text;
using ZMQ;

public class mqtest : MonoBehaviour
{
    public float speed;
    private Rigidbody rb;

    // Start is called before the first frame update
    void Start()
    {
        rb = GetComponent<Rigidbody>();
    }


    void FixedUpdate()
    {
        float moveHorizontal = Input.GetAxis("Horizontal");
        float moveVertical = Input.GetAxis("Vertical");

        Vector3 movement = new Vector3(moveHorizontal, 0.0f, moveVertical);
        System.Console.WriteLine("Hey");
        rb.AddForce(movement * speed);
    }

    // Update is called once per frame
    void Update()
    {
        if (Input.GetKeyDown("space"))
        {
            print("space key was pressed");
            string endpoint = "tcp://127.0.0.1:5555";

            // ZMQ Context and client socket
            using (ZMQ.Context context = new ZMQ.Context())
            using (ZMQ.Socket client = context.Socket(SocketType.REQ))
            {
                client.Connect(endpoint);
                string request = "Hello";
                client.Send(request, Encoding.UTF8);
            }
        }
    }
}

Question:

In order to design our API/messages, I've made some preliminary tests with our data:

Protobuf V3 Message:

message TcpGraphes {
    uint32 flowId                   = 1;
    repeated uint64 curTcpWinSizeUl = 2; // max 3600 elements
    repeated uint64 curTcpWinSizeDl = 3; // max 3600 elements
    repeated uint64 retransUl       = 4; // max 3600 elements
    repeated uint64 retransDl       = 5; // max 3600 elements
    repeated uint32 rtt             = 6; // max 3600 elements
}

Message build as multipart message in order to add the filter functionality for the client

Tested with 10 python clients: 5 running on the same PC (localhost), 5 running on an external PC. Protocol used was TCP. About 200 messages were sent every second.

Results:

  1. Local client are working: they get every messages
  2. Remote clients are missing some messages (throughput seems to be limited by the server to 1Mbit/s per client)

Server code (C++):

// zeroMQ init
zmq_ctx = zmq_ctx_new();
zmq_pub_sock = zmq_socket(zmq_ctx, ZMQ_PUB);
zmq_bind(zmq_pub_sock, "tcp://*:5559");

every second, about 200 messages are sent in a loop:

std::string serStrg;
tcpG.SerializeToString(&serStrg);
// first part identifier: [flowId]tcpAnalysis.TcpGraphes
std::stringstream id;
id << It->second->first << tcpG.GetTypeName();
zmq_send(zmq_pub_sock, id.str().c_str(), id.str().length(), ZMQ_SNDMORE);
zmq_send(zmq_pub_sock, serStrg.c_str(), serStrg.length(), 0);

Client code (python):

ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.connect('tcp://x.x.x.x:5559')
print ("Waiting for data...")
while True:
    message = sub.recv() # first part (filter part, eg:"134tcpAnalysis.TcpGraphes")
    print ("Got some data:",message)
    message = sub.recv() # second part (protobuf bin)

We have looked at the PCAP and the server don't use the full bandwidth available, I can add some new subscribers, remove some existing ones, every remote subscriber gets "only" 1Mbit/s.

I've tested an Iperf3 TCP connection between the two PCs and I reach 60Mbit/s.

The PC who runs the python clients has about 30% CPU last. I've minimized the console where the clients are running in order to avoid the printout but it has no effect.

Is it a normal behavior for the TCP transport layer (PUB/SUB pattern) ? Does it means I should use the EPGM protocol ?

Config:

  • windows xp for the server
  • windows 7 for the python remote clients
  • zmq version 4.0.4 used

Answer:

A performance motivated interest ?

Ok, let's first use the resources a bit more adequately :

// //////////////////////////////////////////////////////
// zeroMQ init
// //////////////////////////////////////////////////////

zmq_ctx = zmq_ctx_new();

int aRetCODE = zmq_ctx_set( zmq_ctx, ZMQ_IO_THREADS, 10 );

assert( 0 == aRetCODE );

zmq_pub_sock = zmq_socket(  zmq_ctx, ZMQ_PUB );

    aRetCODE = zmq_setsockopt( zmq_pub_sock, ZMQ_AFFINITY, 1023 );
    //                                                     ^^^^
    //                                                     ||||
    //                                 (:::::::::::)-------++++
    // >>> print ( "[{0: >16b}]".format( 2**10 - 1 ) ).replace( " ", "." )
    // [......1111111111]
    //        ||||||||||
    //        |||||||||+---- IO-thread 0
    //        ||||||||+----- IO-thread 1
    //        |......+------ IO-thread 2
    //        ::             :         :
    //        |+------------ IO-thread 8
    //        +------------- IO-thread 9
    //
    // API-defined AFFINITY-mapping

Non-windows platforms with a more recent API can touch also scheduler details and tweak O/S-side priorities even better.


Networking ?

Ok, let's first use the resources a bit more adequately :

    aRetCODE = zmq_setsockopt( zmq_pub_sock, ZMQ_TOS, <_a_HIGH_PRIORITY_ToS#_> );

Converting the whole infrastructure into epgm:// ?

Well, if one wishes to experiment and gets warranted resources for doing that E2E.

Question:

I need to move away from TCP to UDP transport in my .net application. Using udp transport with ZeroMQ (.net binding) complains that the transport is not supported. I understand that the Native "NetMQ" implementation clearly does not support UDP but what about clrzmq4, which I understand is a wrapper over libzmq, not support UDP either? Do I need to use raw UDP sockets?


Answer:

Actually you can also do pure UDP with new socket types DISH and RADIO, which similar to PUB-SUB.

Take a look at the test for a usage example:

https://github.com/zeromq/libzmq/blob/master/tests/test_udp.cpp

Question:

The import org.zeromq cannot be resolved, what can I do?

Im trying to Subscribe ZMQ for my web app. First time working with ZMQ and im getting a little frusty. Can anybody help?

It has been a while since I used Java the last time.

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
* Pubsub envelope subscriber
*/

public class psenvsub {

    public static void main (String[] args) {

        // Prepare our context and subscriber
        Context context = ZMQ.context(1);
        Socket subscriber = context.socket(ZMQ.SUB);

        subscriber.connect("tcp://localhost:5563");
        subscriber.subscribe("B".getBytes());
        while (!Thread.currentThread ().isInterrupted ()) {
            // Read envelope with address
            String address = subscriber.recvStr ();
            // Read message contents
            String contents = subscriber.recvStr ();
            System.out.println(address + " : " + contents);
        }
        subscriber.close ();
        context.term ();
    }
}

Answer:

You need to have zeromq implementation. Looks like there are multiple implementations in maven central: https://mvnrepository.com/artifact/org.zeromq

This POM should resolve imports:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>zmq</groupId>
    <artifactId>zmq-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.zeromq/jeromq -->
        <dependency>
            <groupId>org.zeromq</groupId>
            <artifactId>jeromq</artifactId>
            <version>0.4.0</version>
        </dependency>
    </dependencies>

</project>

If you aren't familiar with Maven, read here

Question:

I have done a sample ZeroMQ PGM multicast application and it is working fine.

But reply handling is not working. Is this correct approach or not? If yes - how to do any reply from a Receiver to the Sender?

Sender:

std::string msg = "hello";
socket->send(msg.c_str(),msg.length(),0);
socket->recv(reply);                          // Can we do this?

Receiver:

char msg[100] = {0};
std::string reply = "Succ";
socket->recv(msg,100,0);
socket->send(reply.c_str(),reply.length(),0); // Can we do this?

Answer:

Yes, no!

Yes, the ZeroMQ pgm:// transport-class does not support this.

No, we cannot do this. Neither a .send() on a SUB side, or a .recv() on a PUB side.

ZeroMQ API Specification says:The pgm and epgm transports can only be used with the ZMQ_PUB and ZMQ_SUB socket types.

ZMQ_SUB archetype does not permit any .send() method. ZMQ_PUB archetype does not permit any .recv() method.

Q.E.D.

So, how to do that, so as to create a bi-directional signalling?

One has to add also another socket, most probably using a tcp:// transport class, over which a feedback signalling may take place.

ZeroMQ's reverse .bind()/.connect() mechanics are a common practice for doing this in an unstructured, formally uncontrolled / configuration-policy unenforced distributed computing infrastructures.

Question:

According to the gRPC.io website I know gRPC supports http 1.x and 2.0 but what about other "transports" like zeromq or nanomsg?


Answer:

gRPC does not support zeromq or nanomsg. The supported transports are HTTP2, QUIC and in-process. You can find more details in https://grpc.github.io/grpc/core/md_doc_core_transport_explainer.html

Question:

i'm using communication based on ZeroMq and NetMQ (same problem in both projects)

I have applications running behind fire walls,

on the server side its easy to define which ports to open for inbound traffic,

however on the client side it seems that i am able only to specify the target (a.k.a server) address and port.

Is it possible to define which port will be used for outbound traffic on the client app.

for example (in NetMQ)

      using (NetMQContext ctx = NetMQContext.Create())
      {
         using (NetMQSocket snapshot = ctx.CreateSocket(ZmqSocketType.Dealer))
         {
           // connect to remote address, no place to specify outbound port
           snapshot.Connect("192.168.1.10:5555");   
         }
      }

In a simple communication scenario we have a Server and a Client

server is running on machine A (192.168.1.10) - and listening on port 5555

Client is running on machine B (192.168.1.9) - and is initiating communication to server (A)

if we look at the communication info on the client we would see that the system allocated port XXXXX (for example 51234) and its outbound to 192.168.1.10:5555

in most cases the XXXXX port is allocated by the system ( a free port), however in some extreme cases the XXXXX port needs to be a specific port (due to extreme security environment)


Answer:

As you state in your update, the source port is usually allocated by the system. I did find some ZMQ specific communication on the matter:

In this link there is some discussion of how a source IP and source port might be specified in the ZMQ protocol - the people talking here are the core dev team maintaining ZMQ (in particular, Pieter Hintjens "founded" ZMQ), so you can take from this that it was not implemented as of a year ago.

Further, here you can see it was eventually implemented in April of this year... so so long as you have a recent version of ZMQ, and your binding doesn't have any peculiarities that would block this functionality, you should be able to do it if you follow those guidelines.

In particular, it would look something like the following:

snapshot.Connect("192.168.1.9:51234;192.168.1.10:5555");

Again, this may or may not be altered by your binding.

Question:

I have a reply ZSocket in a request-reply pattern.

Later in the process, I want to switch the binding, so the reply socket now responds to requests on a different port. The user enters the port that they want to listen on, if they don't want to listen on the default port 5555.

Can I do this by simply calling ZSocket.Bind() again, or do I have to destroy the socket object first.

If I have to destroy it first, then how can I be sure the ZSocket is ready to be rebound to something else now? I know that closing sockets is handled differently in C because it doesn't have garbage collecting. Since the C# language binding for ZeroMQ can use of garbage collecting, is it as simple as closing the socket and creating a new one on another port? Do I need to bother with setting a low Linger value or anything like that?

I couldn't find anything about this in the documentation. Here is a related question ZeroMQ: Address in use error when re-binding socket

Here is the code I have

private ZContext context = new ZContext();
private ZSocket replier = new ZSocket(context, ZSocketType.REP);
replier.Bind("tcp://*:5555); //bind to default port
//yada yada...
//user enters new port to listen on 5556
replier.Bind("tcp://*:5556);

What is the proper way to do this? Please explain the logic behind the answer.


Answer:

I'm not sure about C# bindings, but you can rebind zmq socket using plain C api and start receiving messages on new port right away. Linger period is unrelated to your case.

Question:

The server is bound to two sockets on port#s: 6666 ( to service an SMS send request from client ) and 6661 ( to service an EMAIL send request from client ).

Since I want to service them differently, I must first know on which port the request has come, so that I can execute a code for that service.

So, my question is how can I identify the port?

If this is not possible, then what logic can be applied to solve this problem?

The server side code is :

int main () {

zmq::context_t context (1);               // Prepare our context and socket

zmq::socket_t socket  (context, ZMQ_REP); socket.bind  ("tcp://*:6666");
zmq::socket_t socket2 (context, ZMQ_REP); socket2.bind ("tcp://*:6661");

while (true) {      
  // ----------------------------------SMS CODE----------------------------------
     zmq::message_t request;
     socket.recv ( &request);             // Wait for next request from client

  /* ...                                  // SMS Send Logic
     ...
                                          */
     zmq::message_t reply (11);           // Send reply back to client
     memcpy (reply.data (), "SMS Details", 11);
     socket.send (reply);

  // --------------------------------EMAIL CODE----------------------------------
     zmq::message_t request2;
     socket2.recv (&request2);            // Wait for next request from client

  /* ...                                  // Email Send Logic
     ...                           
                                          */
     zmq::message_t reply2 (16);          // Send reply back to client
     memcpy (reply2.data (), "Email Details", 16);
     socket2.send (reply2);
  }                                       // end of while
return 0;
}

Answer:

You should use zmq::poll to check which sockets have pending messages before attempting to read from them. There is an official cpp example here.

You can also use ZMQ_DONTWAIT to check for messages, but you should still poll to avoid using too much CPU.

example:

#include "zeromq.hpp"

int main (int argc, char *argv[])
{
    zmq::context_t context(1);

    zmq::socket_t emailSocket  (context, ZMQ_REP); emailSocket.bind  ("tcp://*:6666");
    zmq::socket_t smsSocket (context, ZMQ_REP); smsSocket.bind ("tcp://*:6661");

    //  Initialize poll set
    zmq::pollitem_t items [] = {
        { emailSocket, 0, ZMQ_POLLIN, 0 },
        { smsSocket, 0, ZMQ_POLLIN, 0 }
    };
    //  Process messages from both sockets
    while (1) {
        zmq::message_t message;
        // wait until there is a message ready for one of the sockets
        zmq::poll (&items [0], 2, -1);

        // try to read/process a message from email socket (Don't wait, just skip if there are none)
        if (emailSocket.recv(&message, ZMQ_DONTWAIT)) {
            //  Process email request
            zmq::message_t response(...);
            emailSocket.send(&response);
        }
        // same again for sms socket
        if (smsSocket.recv(&message, ZMQ_DONTWAIT)) {
            //  Process sms request
            zmq::message_t response(...);
            smsSocket.send(&response);
        }
        // back to the top of the loop to wait for another message
    }
    return 0;
}