Hot questions for Using ZeroMQ in inproc

Question:

Q1: What exactly is the difference between using ZeroMQ to send messages to child processes, as compared to the default inter process communication explained here?

Q2: For direct process to child communication, which would be more appropriate? (Faster)

Q3: The docs say: Creates an IPC channel, what kind of IPC does it use? TCP? Sockets?


Answer:


A good point to state in the very inital moment - ZeroMQ is broker-less


A1: The difference between using ZeroMQ to send messages & IPC

well, put in this way, ZeroMQ concentrates on much different benefits, than just the ability to send message & scale-up ( both of which is helpfull ).

ZeroMQ introduces ( well Scaleable ) Formal Communication Patterns

This said, the core application-side focus is directed into what ZeroMQ-library pattern primitives could be used to either straight fulfill the actual needed behaviour model between participating agents ( one PUB + many SUB-s / many PUB-s + many cross-connected SUB-s ) or how to compose a bit more complex, application specific, signalling-plane ( using available ZeroMQ building blocks behaviorally-primitive-socket archetypes + devices + application logic, providing finite-state-machine or transactional engines for signalling-plane added functionality ).

Standard IPC provides a dumb O/S-based service, no behaviour

which is fine, if understood in the pure O/S-context ( i.e. "batteries included" is not the case ).

Nevertheless, any higher level messaging support and other grand features ( alike fair-queue, round-robin scheduling, mux-ed transport-agnostic service composition over any/all { inproc:// | ipc:// | tcp:// | pqm:// | ... } transport-classes, millisecond-tuned multi-channel pollers, zero-copy message handovers and many other smart features ) are to be designed / implemented on your own ( which is the very case, why ZeroMQ was put in the game, not to have to do so, wasn't it? many thanks, Martin SUSTRIK & Pieter HINTJENS' team )


The best next step?

To see a bigger picture on this subject >>> with more arguments, a simple signalling-plane picture and a direct link to a must-read book from Pieter HINTJENS.


A2: Faster? I would worry if anybody grants an easy answer. It depends... A lot...

If interested in a younger sister of ZeroMQ, a nanomsg, check even a more lightweight framework from Martin SUSTRIK nanomsg.org >>>.

Fast, Faster, Fastest ...

For inspiration on minimum-overhead ( read as a high potential for speed ) zero-copy ( read as efficient overhead-avoidance ) read about inproc:// transport classes for inter-thread messaging:


A3: It uses IPC.

IPC is a transport-class on its own. There is no need to re-wrap/align/assemble/CRC/encapsulate/dispatch|decode\CRC-recheck\demap... the raw IPC-data into a higher abstraction TCP-packets if being transported right between localhost processes over an IPC-channel, is it?

Question:

Scenario :

We were evaluating ZeroMQ (specifically jeroMq) for an event driven mechanism.

The application is distributed where multiple services (both publishers and subscribers are services) can exist either in the same jvm or in distinct nodes, which depends on the deployment architecture.

Observation

For playing around I created a pub/sub pattern with inproc: as the transport , using jero mq (version :0.3.5)

  1. The thread publishing is able to publish (looks like getting published, at least no errors)
  2. The subscriber which is in another thread is not receiving anything.

Question

Is using inproc: along with pub/sub feasible?

Tried googling but couldn't find anything specific, any insights?

Code sample for pub/sub with inproc:

The working code sample for inproc pub sub using jero mq (version :0.3.5), would be useful for someone later visiting this post. One publisher publishing topics A and B, and two subscribers receiving A and B separately

/**
 * @param args
 */
public static void main(String[] args) {

    // The single ZMQ instance
    final Context context = ZMQ.context(1);

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    //Publisher
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startPublishing(context);
        }
    });
    //Subscriber for topic "A"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startFirstSubscriber(context);
        }
    });
    // Subscriber for topic "B"
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            startSecondSubscriber(context);
        }
    });

}

/**
 * Prepare the publisher and publish
 * 
 * @param context
 */
private static void startPublishing(Context context) {

    Socket publisher = context.socket(ZMQ.PUB);
    publisher.bind("inproc://test");
    while (!Thread.currentThread().isInterrupted()) {
        // Write two messages, each with an envelope and content
        try {
            publisher.sendMore("A");
            publisher.send("We don't want to see this");
            LockSupport.parkNanos(1000);
            publisher.sendMore("B");
            publisher.send("We would like to see this");
        } catch (Throwable e) {

            e.printStackTrace();
        }
    }
    publisher.close();
    context.term();
}

/**
 * Prepare and receive through the subscriber
 * 
 * @param context
 */
private static void startFirstSubscriber(Context context) {

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");

    subscriber.subscribe("B".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber1 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}

/**
 * Prepare and receive though the subscriber
 * 
 * @param context
 */
private static void startSecondSubscriber(Context context) {
    // Prepare our context and subscriber

    Socket subscriber = context.socket(ZMQ.SUB);

    subscriber.connect("inproc://test");
    subscriber.subscribe("A".getBytes());
    while (!Thread.currentThread().isInterrupted()) {
        // Read envelope with address
        String address = subscriber.recvStr();
        // Read message contents
        String contents = subscriber.recvStr();
        System.out.println("Subscriber2 " + address + " : " + contents);
    }
    subscriber.close();
    context.term();

}

Answer:

The ZMQ inproc transport is intended for use within a single process, between different threads. When you say "can exist either in the same jvm or in distinct nodes" (emphasis mine) I assume you mean that you're spinning up multiple processes as distributed services rather than multiple threads within a single process.

If that's the case, then no, what you're trying to do won't work with inproc. PUB-SUB/inproc would work fine within a single process between multiple threads.


Edit to address further questions in the comments:

The reason to use a transport like inproc or ipc is because it's a little more efficient (faster) than the tcp transport when you're in the right context to use them. You could conceivably use a mixture of transports, but you'll always have to bind and connect on the same transport to make it work.

This means that each node would need up to three PUB or SUB sockets - a tcp publisher to talk to nodes on remote hosts, an ipc publisher to talk to nodes on different processes on the same host, and an inproc publisher to talk to nodes in different threads in the same process.

Practically, in most cases you'd just use the tcp transport and only spin up one socket for everything - tcp works everywhere. It could make sense to spin up multiple sockets if each socket is responsible for a particular kind of information.

If there's a reason that you'll always be sending one message type to other threads and a different message type to other hosts, then multiple sockets makes sense, but in your case it sounds like, from the perspective of one node, all other nodes are equal. In that case I would use tcp everywhere and be done with it.

Question:

In one of my applications I'm using DEALER/ROUTER inproc connections. I set the linger option on the DEALER socket to 0, so that all messages sent on the DEALER socket shall be discarded once the ROUTER socket is closed. Although this works well for TCP connections, it blocks for inproc. Here's a minimal working example:

#include <zmq.h>

#include <windows.h>

int main()
{
    void *context = zmq_ctx_new();

    void *router = zmq_socket(context, ZMQ_ROUTER);
    zmq_bind(router, "inproc://socket");

    void *dealer = zmq_socket(context, ZMQ_DEALER);
    zmq_connect(dealer, "inproc://socket");

    int linger = 0;
    zmq_setsockopt(dealer, ZMQ_LINGER, &linger, sizeof(linger));

    zmq_close(router);

    // sleep for 1 ms
    Sleep(1);

    // this call blocks
    zmq_send(dealer, "message", 7, 0);

    zmq_close(dealer);
    zmq_ctx_destroy(context);

    return 0;
}

Before the DEALER socket can be closed, the zmq_send() call blocks. In this minimal example, I had to add a Sleep(1) call. When this call is omitted, zmq_send() doesn't block. When blocked, the call stack is as follows:

[External Code] 
libzmq.dll!zmq::signaler_t::wait(int timeout_) Line 253 C++
libzmq.dll!zmq::mailbox_t::recv(zmq::command_t * cmd_, int timeout_) Line 80    C++
libzmq.dll!zmq::socket_base_t::process_commands(int timeout_, bool throttle_) Line 1023 C++
libzmq.dll!zmq::socket_base_t::send(zmq::msg_t * msg_, int flags_) Line 869 C++
libzmq.dll!s_sendmsg(zmq::socket_base_t * s_, zmq_msg_t * msg_, int flags_) Line 346    C++
libzmq.dll!zmq_send(void * s_, const void * buf_, unsigned __int64 len_, int flags_) Line 371   C++

I'm using Windows 10 x64, libzmq 4.2.1 (tested it with 4.1.6 as well), and Visual Studio 2015. How can I cleanly shut down the DEALER/ROUTER connection? Is this a bug in libzmq?


Answer:

Would it be feasable to use ZMQ_DONTWAIT in your

zmq_send()

call and evaluate the errorcode?

Changing your code to

// this call blocks no more
zmq_send(dealer, "message", 7, ZMQ_DONTWAIT);
int ec = zmq_errno();
printf("code: %d\nstring: %s\n", ec, zmq_strerror(ec));

would result in

code: 11

string: Resource temporarily unavailable

Alternatively, can a socket monitor be used to detect the close event and prevent further sending? This does depend on your architecture.

Question:

In general, a ring-buffer is needed for a realtime Producer thread to have decent performance.

But in my case, I have some latency constraints, so I'd love to get rid of the ring-buffer from Producer and push individual data buffers out as they come along. I'd then ring-buffer on the Consumers (slower) side only. And I'd love to avoid thread locking via critical sections.

Since ZMQ requires no thread-locking, I wonder if a inproc PubSub pattern can be used for this. Knowing network I/O is not welcomed in a realtime thread, I'm still curious about whether or not the inproc protocol could make any difference, i.e., better performance.

So the question is: Can I achieve low-latency lock-free/blocking-free data delivery in a realtime thread using ZMQ PubSub with inproc protocol?


Answer:

Q : Can I achieve low-latency lock-free/blocking-free data delivery in a realtime thread using ZMQ PubSub with inproc protocol?

Yes.


Just enough to instantiate a thread-less Context(0)-instance :

No I/O threads are involved in passing messages using the inproc transport. Therefore, if you are using a ØMQ context for in-process messaging only you can initialise the context with zero I/O threads


Affinity (wisely planned bonding) tricks may shave-off further a few hundreds [ns] due to possible avoiding all R/T processing from bearing extensive add-on latency-costs from core-to-core NUMA-non-local memory accesses.

Question:

In the guide for ZeroMQ, there is this:

If you use inproc and socket pairs, you are building a tightly-bound application, i.e., one where your threads are structurally interdependent. Do this when low latency is really vital.

I care a lot about latency for my application.

Question:

  • Is it the "inproc-ness" alone that makes it low-latency?

  • Or is there something special about "inproc + PAIR" that is faster than inproc + "WHATEVER"?


Answer:

In case one has never worked with ZeroMQ, one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds"before diving into further details

Q : is it the "inproc-ness" alone that makes it low-latency?

Yes, . . . as bazza has already put in general yesterday, let me add a few cents : 1) the inproc://-transport-class is the stack-less, protocol-free and a pure ( thus fast & almost zero-latency ) RAM memory-region mapping vehicle and also ( as was asked in the second question )

Q : Or is there something special about "inproc + PAIR" that is faster than inproc + "WHATEVER"?

2) the PAIR/PAIR-Scalable-Formal-Communication-Pattern archetype is adding no extra, Pattern's archetype-related, multi-(many)-party behavioural-handshaking ( compared to some of the other, distributed Finite-State-Automata ( expressing the Pattern's archetype behaviour states and transitions among all the distributed peers - not with a PAIR/PAIR exclusive 1-on-1 digital fire-hose ) so nothing gets added here, beyond thread-safe local pointer mechanics on either side plus some Context()-instance signalling - btw. you may have noticed, that for a pure-inproc://-transport-class application, you may instantiate the Context( 0 ) having Zero-I/O-threads, as in these cases the Context()-signaling does not need them at all, as it just manages pointer-tricks over local-RAM memory-regions -- so cute, isn't it? )

Question:

When launching a zmq server and client, in any random order, communicating over the tcp:// transport-class, they are smart enough to connect/reconnect regardless of the order.

However, when trying to run the same over the inproc:// transport-class, I see that it works only if the client starts after the server. How can we avoid this?


MCVE-code :

Here are some kotlin MCVE-code examples, to reproduce the claim (this is a modified version of the well known weather example)

server.kt - run this to run the server standalone

package sandbox.zmq

import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import sandbox.util.Util.sout
import java.util.*

fun main(args: Array<String>) {
   server(
      context = ZMQ.context(1),
//      publishTo = "tcp://localhost:5556"
      publishTo = "tcp://localhost:5557"
   )
}

fun server(context: Context, publishTo: String) {
   val publisher = context.socket(ZMQ.PUB)
   publisher.bind(publishTo)

   //  Initialize random number generator
   val srandom = Random(System.currentTimeMillis())
   while (!Thread.currentThread().isInterrupted) {
      //  Get values that will fool the boss
      val zipcode: Int
      val temperature: Int
      val relhumidity: Int
      zipcode = 10000 + srandom.nextInt(10)
      temperature = srandom.nextInt(215) - 80 + 1
      relhumidity = srandom.nextInt(50) + 10 + 1

      //  Send message to all subscribers
      val update = String.format("%05d %d %d", zipcode, temperature, relhumidity)
      println("server >> $update")
      publisher.send(update, 0)
      Thread.sleep(500)
   }

   publisher.close()
   context.term()
}

client.kt - run this for the client standalone

package sandbox.zmq

import org.zeromq.ZMQ
import org.zeromq.ZMQ.Context
import java.util.*

fun main(args: Array<String>) {
   client(
      context = ZMQ.context(1),
      readFrom = "tcp://localhost:5557"
   )
}

fun client(context: Context, readFrom: String) {
   //  Socket to talk to server
   println("Collecting updates from weather server")
   val subscriber = context.socket(ZMQ.SUB)
   //        subscriber.connect("tcp://localhost:");
   subscriber.connect(readFrom)

   //  Subscribe to zipcode, default is NYC, 10001
   subscriber.subscribe("".toByteArray())

   //  Process 100 updates
   var update_nbr: Int
   var total_temp: Long = 0
   update_nbr = 0
   while (update_nbr < 10000) {
      //  Use trim to remove the tailing '0' character
      val string = subscriber.recvStr(0).trim { it <= ' ' }
      println("client << $string")
      val sscanf = StringTokenizer(string, " ")
      val zipcode = Integer.valueOf(sscanf.nextToken())
      val temperature = Integer.valueOf(sscanf.nextToken())
      val relhumidity = Integer.valueOf(sscanf.nextToken())

      total_temp += temperature.toLong()
      update_nbr++

   }
   subscriber.close()
}

inproc.kt - run this and modify which sample is called for the inproc:// scenarios

package sandbox.zmq

import org.zeromq.ZMQ
import kotlin.concurrent.thread


fun main(args: Array<String>) {
//   clientFirst()
   clientLast()
}

fun println(string: String) {
   System.out.println("${Thread.currentThread().name} : $string")
}

fun clientFirst() {

   val context = ZMQ.context(1)

   val client = thread {
      client(
         context = context,
         readFrom = "inproc://backend"
      )
   }

   // use this to maintain order
   Thread.sleep(10)

   val server = thread {
      server(
         context = context,
         publishTo = "inproc://backend"
      )
   }

   readLine()
   client.interrupt()
   server.interrupt()
}

fun clientLast() {

   val context = ZMQ.context(1)

   val server = thread {
      server(
         context = context,
         publishTo = "inproc://backend"
      )
   }

   // use this to maintain order
   Thread.sleep(10)

   val client = thread {
      client(
         context = context,
         readFrom = "inproc://backend"
      )
   }

   readLine()
   client.interrupt()
   server.interrupt()
}

Answer:

Why zmq inproc:// connection order matters, unlike for tcp://?

Well, this is a by-design behaviour

Given the native ZeroMQ API warns about this by-design behaviour ( since ever ), the issue is not a problem, but an intended property.

Plus one additional property has to be also met:

The name [ meant an_endpoint_name in .connect("inproc://<_an_endpoint_name_>")] must have been previously created by assigning it to at least one socketwithin the same ØMQ context as the socket being connected.

Newer versions of the native ZeroMQ API ( post 4.0 ), if indeed deployed under one's respective language binding / wrapper, may allow to release the former of these requirements:

Since version 4.0 the order of zmq_bind() and zmq_connect() does not matter just like for the tcp transport type.


How can we avoid this?

Well, a much harder part ...

if not already got an easy way above the ZeroMQ native API v4.2+, one may roll up one's sleeves and either re-factor the pre-4.x language wrapper / binding, so as to make the engine get there, or, may be, test if Martin SUSTRIK's second lovely child, the nanomsg could fit the scene for achieving this.

Question:

I need to do the following:

  • multiple clients connecting to the SAME remote port
  • each of the clients open 2 different sockets, one is a PUB/SUB, the other is a ROUTER/DEALER ( the server can occasionally send back to client heartbeats, different server related information ).

I am completely lost whether it can be done in ZeroMQ or not. Obviously if I can use 2 remote ports, that is not an issue, but I fail to understand if my setup can be achieved with some kind of envelope usage in ZeroMQ. Can it be done? Thanks,

Update:

To clarify what I wish to achieve.

  • Multiple clients can communicate with the server
  • Clients operate on request-response basis mostly(on one socket)
  • Clients create a session socket, which means that whenever this type of socket is created, a separate worker thread needs to be created and from that time on the client communicates with this worker thread with regards to requests processing, e.g. server thread must not block the connection of other clients when dealing with the request of one client
  • However clients can receive occasional messages from the worker thread with regards to heartbeats of the worker.

Update2:

Actually I could sort it out. What I did:

  • identify clients obviously, so ROUTER/DEALER is used, e.g. clients are indeed dealers, hence async processing is provided
  • clients send messages to the one and only local port, where the router sits
  • router peeks into messages (kinda the lazy pirate example), checks whether a new client comes in; if yes it offloads to a separate thread, and connects the separate thread with an internal "inproc:" socket
  • router obviously polls for the frontend and all connected clients' backends and sends messages back and forth.

What bugs me is that it is an overkill if I compare this with a "regular" socket solution, where I could have connected the client with the worker thread DIRECTLY (e.g. worker thread could recv from the socket opened by the client directly), hence I could spare the routing completely. What am I missing?


Answer:

There was a discussion on the ZeroMQ mailing list recently about multiplexing multiple services on one TCP socket. The proposed solutions is essentially what you implemented.

The discussion also mentions Malamute with its brokers which essentially provides a framework based on ZeroMQ which also provides the functionality you need. I haven't had the time to look into it myself, but it looks promising.

Question:

I am just starting to learn NetMQ, derived from zeroMQ. I don't know C or C++ too well so I'm finding the zeroMQ tutorials a challenge, and the NetMQ ones seem to skim a lot. All I want is a two way binding between processes.

app1 ----- Request data ----> app2 app1 <---- Receives data --- app2

The basic example supplied is

using (var context = NetMQContext.Create())
using (var server = context.CreateResponseSocket())
using (var client = context.CreateRequestSocket())
{
    server.Bind("tcp://127.0.0.1:5556");
    client.Connect("tcp://127.0.0.1:5556");

    client.Send("Hello");

    string fromClientMessage = server.ReceiveString();

    Console.WriteLine("From Client: {0}", fromClientMessage);

    server.Send("Hi Back");

    string fromServerMessage = client.ReceiveString();

    Console.WriteLine("From Server: {0}", fromServerMessage);

    Console.ReadLine();
}

And then just change the IPs. No issue there. But from what I understand this will block a thread? What I want is for the client to send a request, then do other stuff. Meanwhile the server receives the message, goes off and gets data or does other processing, and returns the result when it's done. But if I shove a big switch statement in there, do the relevant process and finally return a result, will it block the zeroMQ context/thread? Which leads onto my next question: is zeroMQ context multithreaded?

What I need is the system to work as asynchronously as possible, if I am going completely the wrong direction here I'd appreciate a point in the right one! Many thanks


Answer:

First, yes NetMQContext is thread safe. NetMQSocket is not.

You need to have a dedicate thread for NetMQSocket, one thread can also handle multiple sockets and timers using Poller. You can use the NetMQScheduler to run tasks on the thread as well. Please read the following:

http://netmq.readthedocs.org/en/latest/poller/ http://somdoron.com/2013/06/netmq-scheduler/

Question:

I am fairly new to pyzmq. I am trying to understand inproc: transport class and have created this sample example to play with.

It looks a Publisher instance is publishing messages but Subscriber instances are not receiving any.

In case I move Subscriber instances into a separate process and change inproc: to a tcp: transport class, the example works.

Here is the code:

import threading
import time

import zmq

context = zmq.Context.instance()

address = 'inproc://test' 


class Publisher(threading.Thread):
    def __init__(self):
        self.socket = context.socket(zmq.PUB)

        self.socket.bind(address)

    def run(self):
        while True:
            message = 'snapshot,current_time_%s' % str(time.time())
            print 'sending message %s' % message
            self.socket.send(message)
            time.sleep(1)


class Subscriber(object):
    def __init__(self, sub_name):
        self.name = sub_name
        self.socket = context.socket(zmq.SUB)
        self.socket.connect(address)

    def listen(self):
        while True:
            try:
                msg = self.socket.recv()
                a, b = msg.split(' ', 1)
                print 'Received message -> %s-%s-%s' % (self.name, a, b)
            except zmq.ZMQError as e:
                logger.exception(e)


if __name__ == '__main__':
    thread_a = []
    for i in range(0, 1):
        subs = Subscriber('subscriber_%s' % str(i))
        th = threading.Thread(target=subs.listen)
        thread_a.append(th)
        th.start()

    pub = Publisher()
    pub_th = threading.Thread(target=pub.run)

    pub_th.start()

Answer:

There is nothing wrong, but

ZeroMQ is a wonderfull toolbox.It is full of smart, bright and self-adapting services under the hood, that literally save our poor lives in many ways.Still it is worth to read and obey a few rules from the documentation.

inproc transport class has one such. .bind() first, before .connect()-s

[ Page 38, Code Connected, Volume I ]... inproc is an inter-thread signalling transport ... it is faster than tcp or ipc. This transport has a specific limitation compared to tpc and icp: the server must issue a bind before any client issues a connect. This is something future versions of ØMQ may fix, but at present this defines how you use inproc sockets.

So, as an example:

if __name__ == '__main__':

    pub    = Publisher()
    pub_th = threading.Thread( target = pub.run )
    pub_th.start()
    # give it a place to start before .connect()-s may take place
    # give it a time  to start before .connect()-s may take place
    sleep(0.5)

    thread_a = []
    for i in range( 0, 1 ):
        subs = Subscriber( 'subscriber_%s' % str( i ) )
        th   = threading.Thread( target = subs.listen )
        thread_a.append( th )
        th.start()

Question:

I'm adapting a tcp PubSub example to using inproc with multithread. It ends up hanging forever.

My setup

  • macOS Mojave, Xcode 10.3
  • zmq 4.3.2

The source code reeproducing the issue:

#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <thread>
#include "zmq.h"

void hello_pubsub_inproc() {
    void* context = zmq_ctx_new();
    void* publisher = zmq_socket(context, ZMQ_PUB);
    printf("Starting server...\n");
    int pub_conn = zmq_bind(publisher, "inproc://*:4040");

    void* subscriber = zmq_socket(context, ZMQ_SUB);
    printf("Collecting stock information from the server.\n");
    int sub_conn = zmq_connect(subscriber, "inproc://localhost:4040");
    sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);

    std::thread t_pub = std::thread([&]{
        const char* companies[2] = {"Company1", "Company2"};
        int count = 0;
        for(;;) {
            int which_company = count % 2;
            int index = (int)strlen(companies[0]);
            char update[12];
            snprintf(update, sizeof update, "%s",
                     companies[which_company]);
            zmq_msg_t message;
            zmq_msg_init_size(&message, index);
            memcpy(zmq_msg_data(&message), update, index);
            zmq_msg_send(&message, publisher, 0);
            zmq_msg_close(&message);
            count++;
        }
    });

    std::thread t_sub = std::thread([&]{
        int i;
        for(i = 0; i < 10; i++) {
            zmq_msg_t reply;
            zmq_msg_init(&reply);
            zmq_msg_recv(&reply, subscriber, 0);
            int length = (int)zmq_msg_size(&reply);
            char* value = (char*)malloc(length);
            memcpy(value, zmq_msg_data(&reply), length);
            zmq_msg_close(&reply);
            printf("%s\n", value);
            free(value);
        }
    });

    t_pub.join();

    // Give publisher time to set up.
    sleep(1);

    t_sub.join();

    zmq_close(subscriber);
    zmq_close(publisher);
    zmq_ctx_destroy(context);
}

int main (int argc, char const *argv[]) {
    hello_pubsub_inproc();
    return 0;
}

The result

Starting server...
Collecting stock information from the server.

I've also tried adding this before joining threads to no avail:

zmq_proxy(publisher, subscriber, NULL);

The workaround: Replacing inproc with tcp fixes it instantly. But shouldn't inproc target in-process usecases?

Quick research tells me that it couldn't have been the order of bind vs. connect, since that problem is fixed in my zmq version.

The example below somehow tells me I don't have a missing shared-context issue, because it uses none:

ZeroMQ Subscribers not receiving message from Publisher over an inproc: transport class

I read from the Guide in the section Signaling Between Threads (PAIR Sockets) that

You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not distribute as PUSH or DEALER do. However, you need to configure the subscriber with an empty subscription, which is annoying.

What does it mean by an empty subscription?

Where am I doing wrong?


Answer:

You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not distribute as PUSH or DEALER do. However, you need to configure the subscriber with an empty subscription, which is annoying.


Q : What does it mean by an empty subscription?

This means to set ( configure ) a subscription, driving a Topic-list message-delivery filtering, using an empty subscription string.

Q : Where am I doing wrong?

Here :

// sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);   // Wrong
   sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "",0);  //  Empty string

Doubts also here, about using a proper syntax and naming rules :

// int pub_conn = zmq_bind(publisher, "inproc://*:4040");
   int pub_conn = zmq_bind(publisher, "inproc://<aStringWithNameMax256Chars>");

as inproc:// transport-class does not use any kind of external stack, but maps the AccessPoint's I/O(s) onto 1+ memory-locations ( a stack-less, I/O-thread not requiring transport-class ).

Given this, there is nothing like "<address>:<port#>" being interpreted by such (here missing) protocol, so the string-alike text gets used as-is for identifying which Memory-location are the message-data going to go into.

So, the "inproc://*:4040" does not get expanded, but used "literally" as a named inproc:// transport-class I/O-Memory-location identified as [*:4040] ( Next, asking a .connect()-method of .connect( "inproc://localhost:4040" ) will, and must do so, lexically miss the prepared Memory-location: ["*:4040"] as the strings do not match

So this ought fail to .connect() - error-handling might be silent, as since the versions +4.x there is not necessary to obey the historical requirement to first .bind() ( creating a "known" named-Memory-Location for inproc:// ) before one may call a .connect() to get it cross-connected with an "already existing" named-Memory-location, so the v4.0+ will most probably not raise any error on calling and creating a different .bind( "inproc://*:4040" ) landing-zone and next asking a non-matching .connect( "inproc://localhost:4040" ) ( which does not have a "previously prepared" landing-zone in an already existing named-Memory-location.

Question:

My requirements

  • Clients from different threads in the same process
  • Server in a separate thread in the same process
  • Clients produces messages to Server
  • Server consumes messages by printing them out in the send-order by world clock on the source side, transparent to threading and any scheduling.

Answers to questions like

  • zmq: can multiple threads PUSH in a simple PUSH-PULL pattern
  • Pulling requests from multiple clients with ZMQ

give different opinions. So should I simply ask clients to PUSH to a single inproc PULL server created in another thread or use a router-dealer pattern?

And in one of the comments of the second question, I get STREAMER pattern that seems to exist in pyzmq, but I'm not sure if it's the right solution or is it available with C API at all?


Answer:

Q : Recommended pattern for inproc clients from multiple threads pushing ordered messages to a server?

Any answer to such formulated question is dependent on a missing piece of information: what is the set o preferences, that lead to distinguish between insufficient, sufficient, better and best solution to the above described operations.

Do you need a confimatory feedback from server to client as there is Zero-Warranty for a message delivery?

Do you need to handle a static or a dynamic set of clients?

Do you prefer performance to RAM-footprint?


Without any of these "criteria" expressed a serious man would never "recommend", as any such statement would be a just opinion.

PUSH/PULL may suffice for unconfirmed delivery ( an optimistic blindness use-case, if an out-of-sight out-of-mind design philosophy is acceptable in production )

PAIR/PAIR may suffice for a fast .poll( ZeroWait, ZMQ_POLLIN ) server-side scanner, with server-side POSACK-responses being possible to dispatch to respective client-threads, whose messages were delivered and accepted for server-side processing ( user-defined message-confirmation handshaking protocol, handling POSACKs / NACK-timeouts / out-of-order escallations etc, goes beyond the scope of this post )

PUB/SUB or XPUB/XSUB may suffice for some more tricky management of topic-based signalling, bidirectional in the X-versions, if that justifies the add-on costs of topic-filtering overheads ( depending on ZeroMQ version, whether distributed over all client-threads, or centralised on the server-thread side )

The decision is yours.

Question:

I'm using inproc and PAIR to achieve inter-thread communication and trying to solve a latency problem due to polling. Correct me if I'm wrong: Polling is inevitable, because a plain recv() call will usually block and cannot take a specific timeout.

In my current case, among N threads, each of the N-1 worker threads has a main while-loop. The N-th thread is a controller thread which will notify all the worker threads to quit at any time. However, worker threads have to use polling with a timeout to get that quit message. This introduces a latency, the latency parameter is usually 1000ms.

Here is an example

while (true) {
    const std::chrono::milliseconds nTimeoutMs(1000);
    std::vector<zmq::poller_event<std::size_t>> events(n);  
    size_t nEvents = m_poller.wait_all(events, nTimeoutMs); 
    bool isToQuit = false;
    for (auto& evt : events) {
        zmq::message_t out_recved;
        try {
            evt.socket.recv(out_recved, zmq::recv_flags::dontwait);
        }
        catch (std::exception& e) {
            trace("{}: Caught exception while polling: {}. Skipped.", GetLogTitle(), e.what());
            continue;
        }
        if (!out_recved.empty()) {
            if (IsToQuit(out_recved))
               isToQuit = true;
               break;
        }
    }
    if (isToQuit)
       break;
    //
    // main business
    //
    ...
}

To make things worse, when the main loop has nested loops, the worker threads then need to include more polling code in each layer of the nested loops. Very ugly.

The reason why I chose ZMQ for multithread communication is because of its elegance and the potential of getting rid of thread-locking. But I never realized the polling overhead.

Am I able to achieve the typical latency when using a regular mutex or an std::atomic data operation? Should I understand that the inproc is in fact a network communication pattern in disguise so that some latency is inevitable?


Answer:

An above posted statement ( a hypothesis ):

"...a plain recv() call will usually block and cannot take a specific timeout."

is not correct: a plain .recv( ZMQ_NOBLOCK )-call will never "block", a plain .recv( ZMQ_NOBLOCK )-call can get decorated so as to mimick "a specific timeout"

An above posted statement ( a hypothesis ):

"...have to use polling with a timeout ... introduces a latency, the latency parameter is usually 1000ms."

is not correct: - one need not use polling with a timeout - the less one need not set 1000 ms code-"injected"-latency, spent obviously only on-no-new-message state

Q : "Am I able to achieve the typical latency when using a regular mutex or an std::atomic data operation?"

Yes.

Q : "Should I understand that the inproc is in fact a network communication pattern in disguise so that some latency is inevitable?"

No. inproc-transport-class is the fastest of all these kinds as it is principally protocol-less / stack-less and has more to do with ultimately fast pointer-mechanics, like in a dual-end ring-buffer pointer-management.


The Best Next Step:

1 )Re-factor your code, so as to always harness but the zero-wait { .poll() | .recv() }-methods, properly decorated for both { event- | no-event- }-specific looping.

2 ) If then willing to shave the last few [us] from the smart-loop-detection turn-around-time, may focus on improved Context()-instance setting it to work with larger amount of nIOthreads > N "under the hood".

optionally 3 ) For almost hard-Real-Time systems' design one may finally harness a deterministically driven Context()-threads' and socket-specific mapping of these execution-vehicles onto specific, non-overlapped CPU-cores ( using a carefully-crafted affinity-map )


Having set 1000 [ms] in code, no one is fair to complain about spending those very 1000 [ms] waiting in a timeout, coded by herself / himself. No excuse for doing this.

Do not blame ZeroMQ for behaviour, that was coded from the application side of the API.

Never.