Hot questions for Using ZeroMQ in blocking

Question:

I am trying to use zero-mq.My requirement is very simple.I want to be able to communicate between two peers in a network.I came across this program in the examples in the book.

$ pub_server.py

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    topic = random.randrange(9999,10005)
    messagedata = random.randrange(1,215) - 80
    print "%d %d" % (topic, messagedata)
    socket.send("%d %d" % (topic, messagedata))
    time.sleep(1)

$sub_client.py

import sys
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
for update_nbr in range (5):
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print ('{} {}'.format(topic, messagedata))

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/update_nbr)))

The problem I have with this model is that

string = socket.recv()

blocks till I recieve a message.I don't want this to happen.I want the messages to be queued up on the recieve side so that I can get it out of the queue (or something similar to this)

Is there some model in zero-mq that allows this?


Answer:

zmq.Socket.recv will not block if you pass the zmq.NOBLOCK flag parameter.

The docs say:

If NOBLOCK is set, this method will raise a ZMQError with EAGAIN if a message is not ready.

zmq will queue messages that it receives and one message will be returned for each recv() call until this queue is exhausted after which ZMQError is raised.

zmq.Again used in the exmaples below is a wrapper for zmq.EAGAIN.

For example:

while True:
    try:
        #check for a message, this will not block
        message = socket.recv(flags=zmq.NOBLOCK)

        #a message has been received
        print "Message received:", message

    except zmq.Again as e:
        print "No message received yet"

    # perform other important stuff
    time.sleep(10)

The sub_client.py example could perhaps be written to use non-blocking behaviour like this:

import sys, time
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
received_value_count = 0
do_receive_loop = True
while do_receive_loop:
    try:
        #process all messages waiting on subscribe socket
        while True:
            #check for a message, this will not block
            string = socket.recv(flags=zmq.NOBLOCK)

            #message received, process it
            topic, messagedata = string.split()
            total_value += int(messagedata)
            print ('{} {}'.format(topic, messagedata))

            #check if we have all the messages we want
            received_value_count += 1
            if received_value_count > 4:
                do_receive_loop = False
                break

    except zmq.Again as e:
        #No messages waiting to be processed
        pass

    #Here we can do other stuff while waiting for messages
    #contemplate answer to 'The Last Question'
    time.sleep(15)
    print "INSUFFICIENT DATA FOR MEANINGFUL ANSWER"

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/5)))

Question:

I'm using Julia with the package ZMQ.

I've succeeded in sending and receiving messages with the ZMQ pattern Dealer/Router.

Here is how I receive a message:

dataRecv = bytestring(ZMQ.recv(sockRouter))

But it's blocking. What I need is NON-BLOCKING.

In c++ with ZeroMQ, we can do this:

zmq_msg_recv(&message, socket, 0); // blocking
zmq_msg_recv(&message, socket, ZMQ_DONTWAIT); // non-blocking

In julia, I've found a key word: ZMQ.ZMQ_DONTWAIT, but I don't know how to use it. I've tried like this:

dataRecv = bytestring(ZMQ.recv(sockRouter, ZMQ.ZMQ_DONTWAIT))

But I got an error:

'recv' has no method matching recv(::Socket, ::Int64)

So is it impossible to use non-blocking mode in Julia?

Issue 1) I've posed a question here.

If I'm right, the answer in this question is about multi-threading + blocking.

I know this could work but I prefer to use mono-threading + NON-blocking.

Issue 2) @Chisholm Thank you for giving me the source of ZMQ.jl. But I've done a test like this:

dataRecv = bytestring(ZMQ.recv(sockRouter))
println("after recv")

If I execute the code above, it will block at recv. In other words, it won't print "after recv" until I send it a message.

So I think it's totally a blocking mode.


Answer:

Looking over the code at ZMQ.jl it seems the blocking is caused by the wait after the :zmq_msg_recv, so here is another definition of recv called pollrecv which can be defined in the Main module (changing ZMQ.jl is unnecessary):

function pollrecv(socket::ZMQ.Socket,zmsg::Message)
    rc = -1
    while true
        rc = ccall((:zmq_msg_recv, ZMQ.zmq), Cint, (Ptr{Message}, Ptr{Void}, Cint),
                    &zmsg, socket.data, ZMQ.ZMQ_DONTWAIT)
        if rc == -1
            if !(ZMQ.zmq_errno() == Base.Libc.EAGAIN)
                throw(ZMQ.StateError(ZMQ.jl_zmq_error_str()))
            end
            return false
        else
            ZMQ.get_events(socket) != 0 && notify(socket)
            break
        end
    end
    return true
end

You can use this function in the following way:

msg = Message()
while !pollrecv(s1,msg)
    sleep(3)
    println("ZZzzzz...")
end
out = convert(IOStream,msg)
println(takebuf_string(out))
close(msg)
ZMQ.send(s1,"response important for next receive")

Of course, the while implements blocking using polling, and the sleep should be replaced with other processing.

Question:

On Python, the ZeroMQ .recv()/.send() operations are blocking, which is just perfect for REQ/REP.In Golang, I must pass a zmq.DONTWAIT to the .recv() and .send() operation in order to make it work.

But the thing is, the flow needs to be lock step, so:

  1. server.recv()
  2. client.send()
  3. client.recv()
  4. server.send()

And between 3 and 4 the weirdness starts, because they are async.

When the client has sent a message and the server has not received it yet but client tries to receive a response, the lock step is no lock step any more.

Is there some kind of zmq.DOBLOCK in contrast to zmq.DONTWAIT?

Or did I get something wrong here?


EDIT:

I am using this go binding in C for zeromq: https://godoc.org/github.com/pebbe/zmq4#Type

As you can see here the .recv() needs a input flag, which is one of the both on the second ref:

Recv: https://godoc.org/github.com/pebbe/zmq4#Socket.Recv

Flags to be passed: https://github.com/pebbe/zmq4/blob/master/zmq4.go#L403

This is the current code I got to make a workaround which feels somewhat ugly:

package connection

import (
  "zmq4"
  "fmt"
  "time"
)


const ERRTMPUNAV="resource temporarily unavailable"


func checkError(e error){
  if e != nil {
    panic(e)
  }
}


func CreateRepNode(address string,onMessage chan<- string,send <-chan string,closeConn <-chan bool){
  stop:=false
  socket,err:=zmq4.NewSocket(zmq4.REP)
  checkError(err)
  err=socket.Bind(address)
  checkError(err)
  go func(socket *zmq4.Socket){
    for {
      msg,err:=socket.Recv(zmq4.DONTWAIT)
      fmt.Println("server message"+msg)
      if stop==true {
        return
      }
      if err != nil {
        rateLimit := time.Tick(100 * time.Millisecond)
    <-rateLimit
    continue
      }
      checkError(err)
      onMessage<-msg
      rep:=<-send
      _,err=socket.Send(rep,zmq4.DONTWAIT)
    }
  }(socket)
  <-closeConn
  stop=true
}


func CreateReqNode(address string,onMessage chan<- string,send <-chan string,closeConn <-chan bool){
  stop:=false
  socket,err:=zmq4.NewSocket(zmq4.REQ)
  checkError(err)
  err=socket.Connect(address)
  checkError(err)
  go func(){
    for {
      msg:=<-send
      if stop==true {
        return
      }
      _,err:=socket.Send(msg,zmq4.DONTWAIT)
      for {
        msg,err=socket.Recv(zmq4.DONTWAIT)
        fmt.Println("client got message "+msg)
        if err!=nil {
          if err.Error()==ERRTMPUNAV {
            w:=time.Tick(100*time.Millisecond)
            <-w
            continue
          }
        }
        break
      }
      onMessage<-msg
    }
  }()

  <-closeConn
  stop=true
}

Answer:

ZeroMQ trivial elementary archetypes are more a set of building blocks than a production-grade solution to any need.

Go-lang is a very powerful, modern language with coroutines and other smart tools for a controlled concurrency, so forgive me to state the following list of recommendations:

  • avoid blocking designs wherever one can ( a non-blocking design leaves one in full control of all things as they come ... not "hanging" in any infinite/uncontrollable waiting loop, the worse in an already developed deadlock )

  • avoid relying on a SLOC examples with a single, elementary type of a Formal Communication Pattern, one shall rather develop a robust survivability-handler strategy for all the cases where something may go wrong ( Loss-of-Signal in transport network, Loss-of-Message, DDoS-level of resources overloads, ... )


Redesign hint - do not use REQ/REP at all. Yes, never...

ZeroMQ Scaleable Formal Communication Pattern REQ/REP is fine for learning ZeroMQ, but is lethal in real production grade deployment. For details, read here.

Next think about internally unconditional patterns, alike PAIR ( though marked experimental, for some use-cases it works great ), XREQ/XREP, PUSH/PULL or some composite signalling/transport multi-socket custom-designed own pattern.

The best next step?

What I can do for your further questions right now is to direct you to see a bigger picture on this subject with more arguments, a simple signalling-plane / messaging-plane illustration and a direct link to a must-read book from Pieter HINTJENS.

The book is worth one's time and efforts. If one is serious into distributed systems design, you will love it altogether with Pieter's passion for Zero-sharing, Zero-blocking, (almost) Zero-copy et al.

Question:

I'm attempting to learn ZeroMq for project at work although my background is in C#, and in the most simplest of tests I seem to have an issue where the socket.recv(...) call will block for the first received message, but after this throws an exception because the amount of data received is -1.

Currently my 'server' is:

zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
socket.bind("tcp://127.0.0.1:5555");

while (true)
{
    zmq::message_t message;
    if (socket.recv(&message))
    {
        auto str = std::string(static_cast<char*>(message.data()), message.size());
        printf("Receieved: %s\n", str.c_str());
    }
}

This is basically from following the first example server within the ZeroMq documentation.

I'm pushing 1 bit of data from a C# 'client' using this code:

using (var context = new ZContext())
using (var requester = new ZSocket(context, ZSocketType.REQ))
{
    requester.Connect(@"tcp://127.0.0.1:5555");
    requester.Send(new ZFrame(@"hello"));

    requester.Disconnect(@"tcp://127.0.0.1:5555");
}

Now I start the server, then start the client. I correctly receive the first message and I am correctly able to print this. But now when I hit socket.recv(&message) again the code won't block but will instead throw an exception because the underlying zmq_msg_recv(...) returns a value of -1.

I'm unsure why this is occurring, I cannot see why it is expecting another message as I know that there is nothing else on this port. The only thing I came across is calling zmq_msg_close(...) but this should be called as part of the message_t destructor, which I have confirmed.

Is there anything I'm doing wrong in terms of the socket setup or how I'm using it for the recv(...) call to stop blocking?


Answer:

Your problem is that you cannot receive 2 requests in a row with the REQ-REP pattern.

In the Request-Reply Pattern each request demands a reply. Your client needs to block until it receives a reply to its first request. Also, your server needs to reply to the requests before it services a new request.

Here is a quote referring to your exact issue from the guide.

The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), in a loop (or once if that's all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or recv call. Similarly, the service issues zmq_recv() and then zmq_send() in that order, as often as it needs to.

Question:

How to use ZMQ in a non-blocking manner to "serve" the status of a long running job when the status is requested by a client?

The below code illustrates how the long running task could be temporarily "interrupted" to send the current status.

The task is long running because there are many urls to process, and not because each url takes a long time to process. This would mean that the server could respond to the client with the current status almost instantly.

I have been unable to implement this logic in a non-blocking manner as using the flag zmq.NOBLOCK results in Again: Resource temporarily unavailable, and not using the flag means that the server blocks and waits to receive a message.

How to achieve such logic/behaviour? I am open to using either a C++ or Python.

Server code:

import zmq

# Socket details
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

# List of many urls
urls = ['http://google.com','http://yahoo.com']

def process(url):
    """Sample function"""
    pass

processed_urls = []
for url in urls:

    # If a message has been received by a client respond to the message
    # The response should be the current status.
    if socket.recv(zmq.NOBLOCK):
        msg = b"Processed the following urls %s" % str(processed_urls).encode()
        socket.send(msg, zmq.NOBLOCK)

    # Continue processing the urls
    process(url)
    processed_urls.append(url)

Answer:

1st of all - NON-BLOCKING is a dual-side sword

There are two worlds, each of which CAN and sometimes DOES block.

1) The GIL-side and/or process-side "blocking" can appear ( numpy example below, but valid for any sync-blocking calls that cannot have any easily achievable non-blocking workaround ) while some external process or a global application architecture may still need to have ( at least ) some responding & hand-shaking behaviour from even such knowingly "blocked" Python code-areas.

2) The second world is your ZeroMQ (potentially)-blocking call. Seting a zmq.CONFLATE may additionally help you in PUSH-like URL-reporting from long-job running client to server. Set CONFLATE both on client and server side of the reporting socket.

In every place where I can I do advocate for strictly non-blocking designs. Even the school-book examples of ZeroMQ code ought be realistic and fair not to block. We live in the 3rd Millenium and a blocking code is a performance & resources-usage devastating state, principally outside of one's domain of control in professional grade distributed-systems' design.


A principal scaffolding:
####################################################################
### NEED TO VIEW aHealthSTATUS FROM AN EXTERNAL_UNIVERSE:
### ( A LIGHTWEIGHT EXCULPATED MONITOR TO OBSERVE THE HEALTH OF THE EXECUTION ENVIRONMENT FROM OUTSIDE OF THE VM-JAIL, FROM AN OUTER HYPERVISOR SPACE )
### ( + using signal.signal() )

import signal, os
#-------------------------------------------------------------------
# .SET  ZeroMQ INFRASTRUCTURE:

#-------------------------------------------------------------------
# .DEF  SIG_handler(s)

def SIG_handler_based_HealthREPORTER( SIGnum, aFrame ):
    print( 'SIG_handler called to report state with signal', SIGnum )
    #---------------------------------------------------------------
    # ZeroMQ .send( .SIG/.MSG )

    pass;   # yes, all the needed magic comes right here

    #-------------------------------------------------------------------
    # FINALLY:

    raise OSError( "Had to send a HealthREPORT" )                   # ??? do we indeed need this circus to be always played around, except in a DEMO-mode?

#-------------------------------------------------------------------
# .ASSOC SIG_handler:

signal.signal( signal.SIGALRM, SIG_handler_based_HealthREPORTER )   # .SET { SIGALRM: <aHandler> }-assoc

#-------------------------------------------------------------------
# .SET 1[sec]-delay + 1[sec]-interval

signal.setitimer( signal.ITIMER_REAL, 1, 1 )                        # .SET REAL-TIME Interval-based WatchDog -- Decrements interval timer in real time, and delivers SIGALRM upon expiration.


# ------------------------------------------------------------------
# FINALLY:


#-------------------------------------------------------------------
# .SET / .DEACTIVATE 
signal.setitimer( signal.ITIMER_REAL, 0 )                           # .SET / DEACTIVATE

#-------------------------------------------------------------------
# .TERM GRACEFULLY ZeroMQ INFRASTRUCTURE


#-------------------------------------------------------------------
# CLEAN EXIT(0)
_exit(0)

Let me share an approach used for a sort of aHealthMONITOR on an indeed a long principally-BLOCKING computation cases.

Let's take one example of a GIL-"blocking" type of computations:

#######
# SETUP
signal.signal(    signal.SIGALRM, SIG_ALRM_handler_A )          # .ASSOC { SIGALRM: thisHandler }
signal.setitimer( signal.ITIMER_REAL, 10, 5 )                   # .SET   @5 [sec] interval, after first run, starting after 10[sec] initial-delay
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1                         # .RESET .INIT()

Mechanics of SIGALRM + ITIMER_REAL deliver a lovely automation to keep external worlds happy with at least some responsiveness ( as frequent as ~ 0.2 [Hz] in this example, but principally {up-|down-}-scalable to any reasonable & yet system-wide stable amount of time -- testing a 0.5 [GHz] handler on a 1.0 [GHz] VM-system is left for a kind ultimate hacker's consideration -- otherwise a common sense for reasonable factors of scale and non-blocking/low-latency designs apply )

DEMO readouts show, how involuntary= context switches demonstrate the blocking-indifferent mechanics ( read the numbers, as they grow, while voluntary remain the same throughout the whole GIL-blocking part of the process ), so a similarly def-ed SIG_ALRM_handler_XYZ() can provide a solution to your process-state independent on-demand reporter.

SIG_ALRM_handler_A(): activated             Wed Oct 19 14:13:14 2016 ------------------------------ pctxsw(voluntary=53151, involuntary=1169)

>>> SIG_ALRM_last_ctx_switch_VOLUNTARY                              53243
>>> SIG_ALRM_last_ctx_switch_FORCED                                  1169

>>> [ np.math.factorial( 2**f ) for f in range(20) ][:5]            # too fast to notice @5[sec]
[1, 2, 24, 40320, 20922789888000]

#########
# COMPUTE
# len(str([np.math.factorial(2**f) for f in range(20)][-1]))    # .RUN   A "FAT"-BLOCKING CHUNK OF A regex/numpy/C/FORTRAN-calculus

>>> len( str( [ np.math.factorial( 2**f ) for f in range(20) ][-1] ) )
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1234)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1257)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1282)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1305)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1330)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1352)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1377)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1400)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1425)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1448)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1473)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1496)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1521)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1543)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1568)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1591)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1616)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1639)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1664)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1687)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1713)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1740)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1767)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1790)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1812)  INSPECT processes ... ev. add a StateFull-self-Introspection
2771010

In this process-context, there was used this handler:

########################################################################
### SIGALRM_handler_          
###

import psutil, resource, os, time

SIG_ALRM_last_ctx_switch_VOLUNTARY = -1
SIG_ALRM_last_ctx_switch_FORCED    = -1

def SIG_ALRM_handler_A( aSigNUM, aFrame ):                              # SIG_ALRM fired evenly even during [ np.math.factorial( 2**f ) for f in range( 20 ) ] C-based processing =======================================
    #
    # onEntry_ROTATE_SigHandlers() -- MAY set another sub-sampled SIG_ALRM_handler_B() ... { last: 0, 0: handler_A, 1: handler_B, 2: handler_C }
    #
    # onEntry_SEQ of calls of regular, hierarchically timed MONITORS ( just the SNAPSHOT-DATA ACQUISITION Code-SPRINTs, handle later due to possible TimeDOMAIN overlaps )
    # 
    aProcess         =   psutil.Process( os.getpid() )
    aProcessCpuPCT   =         aProcess.cpu_percent( interval = 0 )     # EVENLY-TIME-STEPPED
    aCtxSwitchNUMs   =         aProcess.num_ctx_switches()              # THIS PROCESS ( may inspect other per-incident later ... on anomaly )

    aVolCtxSwitchCNT = aCtxSwitchNUMs.voluntary
    aForcedSwitchCNT = aCtxSwitchNUMs.involuntary

    global SIG_ALRM_last_ctx_switch_VOLUNTARY
    global SIG_ALRM_last_ctx_switch_FORCED

    if (     SIG_ALRM_last_ctx_switch_VOLUNTARY != -1 ):                # .INIT VALUE STILL UNCHANGED
        #----------
        # .ON_TICK: must process delta(s)
        if ( SIG_ALRM_last_ctx_switch_VOLUNTARY == aVolCtxSwitchCNT ):
            #
            # AN INDIRECT INDICATION OF A LONG-RUNNING WORKLOAD OUTSIDE GIL-STEPPING ( regex / C-lib / FORTRAN / numpy-block et al )
            #                                                                                 |||||              vvv
            # SIG_:  Wed Oct 19 12:24:32 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=315)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:37 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=323)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:42 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=331)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:47 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=338)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:52 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=346)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:24:57 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=353)  ~~~  0.0
            # ...                                                                             |||||              ^^^
            # 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000]
            # >>>                                                                             |||||              |||
            #                                                                                 vvvvv              |||
            # SIG_:  Wed Oct 19 12:26:17 2016 ------------------------------ pctxsw(voluntary=49983, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:22 2016 ------------------------------ pctxsw(voluntary=49984, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:27 2016 ------------------------------ pctxsw(voluntary=49985, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:32 2016 ------------------------------ pctxsw(voluntary=49986, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:37 2016 ------------------------------ pctxsw(voluntary=49987, involuntary=502)  ~~~  0.0
            # SIG_:  Wed Oct 19 12:26:42 2016 ------------------------------ pctxsw(voluntary=49988, involuntary=502)  ~~~  0.0                
            print(   "SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: ", time.ctime(), 10 * "-",  aProcess.num_ctx_switches(), "{0:_>60s}".format( str( aProcess.threads() ) ),          " INSPECT processes ... ev. add a StateFull-self-Introspection" )
    else:
        #----------
        # .ON_INIT: may report .INIT()
        print(   "SIG_ALRM_handler_A(): activated            ", time.ctime(), 30 * "-",  aProcess.num_ctx_switches() )

    ##########
    # FINALLY:

    SIG_ALRM_last_ctx_switch_VOLUNTARY = aVolCtxSwitchCNT               # .STO ACTUALs
    SIG_ALRM_last_ctx_switch_FORCED    = aForcedSwitchCNT               # .STO ACTUALs

Question:

There seem to be two main ways to asynchronously get data from your socket in ZeroMQ:

if(zmq_recv(&msg, ZMQ_NOBLOCK) == 0) { // could return EAGAIN or others
    // do stuff
}

...and...

if(zmq_poll(&pollItems[0], num, timeout)) {
    if(zmq_poll(&msg) == 0) {
        // do stuff
    }
}

If I only have one socket that I want to read from, is there a compelling reason I should opt for the version that uses zmq_poll rather than zmq_recv with ZMQ_NOBLOCK?


Answer:

You would rarely do a zmq_recv(&msg, ZMQ_NOBLOCK) without using zmq_poll

Normally you would read in a loop;

while (!done) {
...
  zmq_recv(&msg, ZMQ_NOBLOCK);
... 
}

This is a busy wait, it uses CPU unnecessarily. zmq_poll() blocks, and don't consume CPU while it's waiting for something to happen.

If you only have one socket, and don't want to use zmq_poll, you would normally use a blocking zmq_recv, and set a ZMQ_RCVTIMEO so you don't block forever if something has gone bad at the sending side.

Question:

Hello all I want to use the zsock_recv API in a non-blocking mode. Is there a way to do that? I can't find any reliable documentation on the zsock API's. Could anyone please point me towards one?


Answer:

ZeroMQ zsock API is documented here:

http://api.zeromq.org/czmq3-0:zsock

Question:

I wrote a test for ZeroMQ to convince myself that it manages to map replies to the client independent from processing order, which would prove it thread safe.

It is a multi-threaded server, which just throws the received messages back at the sender. The client sends some messages from several threads and checks, if it receives the same message back. For multi-threading I use OpenMP.

That test worked fine and I wanted to move on and re-implement it with C++ bindings for ZeroMQ. And now it doesn't work in the same way anymore.

Here's the code with ZMQPP:

#include <gtest/gtest.h>
#include <zmqpp/zmqpp.hpp>
#include <zmqpp/proxy.hpp>

TEST(zmqomp, order) {
    zmqpp::context ctx;

    std::thread proxy([&ctx] {
        zmqpp::socket dealer(ctx, zmqpp::socket_type::xrequest);
        zmqpp::socket router(ctx, zmqpp::socket_type::xreply);
        router.bind("tcp://*:1234");
        dealer.bind("inproc://workers");
        zmqpp::proxy(router, dealer);
    });

    std::thread worker_starter([&ctx] {
#pragma omp parallel
        {
            zmqpp::socket in(ctx, zmqpp::socket_type::reply);
            in.connect("inproc://workers");
#pragma omp for
            for (int i = 0; i < 1000; i++) {
                std::string request;
                in.receive(request);
                in.send(request);
            }
        }
    });

    std::thread client([&ctx] {
#pragma omp parallel
        {
            zmqpp::socket out(ctx, zmqpp::socket_type::request);
            out.connect("tcp://localhost:1234");
#pragma omp for
            for (int i = 0; i < 1000; i++) {
                std::string msg("Request " + std::to_string(i));
                out.send(msg);    
                std::string reply;
                out.receive(reply);

                EXPECT_EQ(reply, msg);
            }
        }
    });

    client.join();
    worker_starter.join();
    ctx.terminate();
    proxy.join();
}

The test blocks and doesn't get executed to the end. I played around with #pragmas a little bit and found out that only one change can "fix" it:

//#pragma omp parallel for
            for (int i = 0; i < 250; i++) {

The code is still getting executed parallel in that case, but I have to divide the loop executions number by a number of my physical cores.

Does anybody have a clue what's going on here?


Answer:

Prologue: ZeroMQ is by-definition and by-design NOT Thread-Safe.

This normally does not matter as there are some safe-guarding design practices, but situation here goes even worse, once following the proposed TEST(){...} design.

Having spent some time with ZeroMQ, your proposal headbangs due to violations on several principal things, that otherwise help distributed architectures to work smarter, than a pure SEQ of monolithic code.

ZeroMQ convinces in ( almost ) every third paragraph to avoid sharing of resources. Zero-sharing is one of the ZeroMQ's fabulous scalable performance and minimised latency maxims, so to say in short.

So one has better to avoid sharing zmq.Context() instance at all ( unless one knows pretty well, why and how the things work under the hood ).

Thus an attempt to fire 1000-times ( almost ) in parallel ( well, not a true PAR ) some flow of events onto a shared instance of zmq.Context ( the less once it was instantiated with default parameters and having none performance tuning adaptations ) will certainly suffer from doing the very opposite from what is, performance-wise and design-wise, recommended to do.


What are some of the constraints, not to headbang into?

1) Each zmq.Context() instance has a limited amount of I/O-threads, that were created during the instantiation process. Once a fair design needs some performance-tuning, it is possible to increase such number of I/O-threads and data-pumps will work that better ( sure, none amount of data-pumps will salvage a poor, the less a disastrous design / architecture of a distributed computing system. This is granted. ).

2) Each zmq.Socket() instance has an { implicit | explicit } mapping onto a respective I/O-thread ( Ref. 1) ). Once a fair design needs some increased robustness against sluggish event-loop handlings or against other adverse effects arisen from data-flow storms ( or load-balancing or you name it ), there are chances to benefit from a divide-and-conquer approach to use .setsockopt( zmq.AFFINITY, ... ) method to directly map each zmq.Socket() instance onto a respective I/O-thread, and remain thus in control of what buffering and internal queues are fighting for which resources during the real operations. In any case, where a total amount of threads goes over the localhost number of cores, the just-CONCURRENT scheduling is obvious ( so a dream of a true PAR execution is principally and inadvertently lost. This is granted. ).

3) Each zmq.Socket() has also a pair of "Hidden Queue Devastators", called High-Watermarks. These get set either { implicitly | explicitly }, the latter being for sure a wiser manner for performance tuning. Why Devastators? Because these stabilise and protect the distributed computing systems from overflows and are permitted to simply discard each and every message above the HWM level(s) so as to protect the systems capability to run forever, even under heavy storms, spurious blasts of crippled packets or DDoS-types of attack. There are many tools for tuning this domain of ZeroMQ Context()-instance's behaviour, which go beyond the scope of this answer ( Ref.: other my posts on ZeroMQ AFFINITY benefits or the ZeroMQ API specifications used in .setsockopt() method ).

4) Each tcp:// transport-class based zmq.Socket() instance has also inherited some O/S dependent heritage. Some O/S demonstrate this risk by extended accumulation of ip-packets ( outside of any ZeroMQ control ) until some threshold got passed and thus a due design care ought be taken for such cases to avoid adverse effects on the intended application signalling / messaging dynamics and robustness against such uncontrollable ( exosystem ) buffering habits.

5) Each .recv() and .send() method call is by-definition blocking, a thing a massively distributed computing system ought never risk to enter into. Never ever. Even in a school-book example. Rather use non-blocking form of these calls. Always. This is granted.

6) Each zmq.Socket() instance ought undertake a set of careful and graceful termination steps. A preventive step of .setsockopt( zmq.LINGER, 0 ) + an explicit .close() methods are fair to be required to be included in every use-case ( and made robust to get executed irrespective of any exceptions that may get appeared. ). A poor { self- | team- }-discipline in this practice is a sure ticket into hanging up the whole application infrastructure due to just not paying due care on a mandatory resources management policy. This is a must-have part of any serious distributed computing Project. Even the school-book examples ought have this. No exceptions. No excuse. This is granted.

Question:

I am getting back to life a three years old project built on zeromq.

The code was working (as far as I know) at the moment when the code was written (on ubuntu 14.04). Now (ubuntu 16.04 and libzmq.so.5) the code compiles but something is wrong with the zeromq communication, and this is getting me crazy.

The zeromq part was written by me, so I know the code quite well, and maybe this is why I cannot see the error.

The server side code is quite complex but I try to stick to the relevant part:

WorkerServer::WorkerServer(){
    address="tcp://*:4321";
    justreceived=-1;
    bind();
}

void WorkerServer::bind(){
    actual_socket=server_socket();
    actual_socket->bind(address.c_str());
    std::cout << "I: server listening on " << address.c_str() << std::endl ;
}

static zmq::socket_t* server_socket(){
    static zmq::context_t context(1);
    return new zmq::socket_t(context, ZMQ_REP);
}

After initialization, the server starts an endless loop that calls these lines of code:

int rc=actual_socket->recv(&message);
if(rc!=0){
    std::cout << "E: socket error number " << errno << " (" << zmq_strerror(errno) << ")" << std::endl;
}else{
    std::cout << "I: received message" << std::endl ;
}

When I compiled it for the first time, I started to receive just EAGAIN errors and nothing was working. So, I wrote two simple clients, the first one in C++ and the second one in Python.

The first one (C++) generates on the client this error:

E: connect failed with error 11 (Resource temporarily unavailable)

and the second one (Python) generates on the server this error:

E: socket error number 11 (Resource temporarily unavailable)

but the client actually receives a reply.

This is the python code:

#!/usr/bin/python

import zmq
import sys

port = "4321"    
context = zmq.Context()
print "Connecting to server..."
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:%s" % port)
if len(sys.argv) > 2:
    socket.connect ("tcp://localhost:%s" % port1)

#  Do 10 requests, waiting each time for a response
for request in range (1,10):
    print "Sending request ", request,"..."
    socket.send ("Hello")
    #  Get the reply.
    message = socket.recv()
    print "Received reply ", request, "[", message, "]"

and this is the c++ code:

#include <string>
#include <vector>
#include <iostream>
#include "msgpack.hpp"
#include "unistd.h"
#include "cxxabi.h"
#include "zmq.hpp"

main(){

    std::string server_name("tcp://localhost:4321");

    static zmq::context_t context(1);
    std::cout << "I: connecting to server " << server_name << " with context " << (void*)(context) << std::endl;
    zmq::socket_t * client = new zmq::socket_t (context, ZMQ_REQ);
    std::cout << "I: created client " << (void*)(client) << " with errno " << errno << std::endl;
    sleep(1);
    client->connect (server_name.c_str());
    if(errno!=0){
        std::cout << "E: connect failed with error " << errno << " (" << zmq_strerror (errno) << ")" << std::endl;
        exit(1);
    }
}

Any idea? I do not understand why this does not work and why there is such a difference between python and c++.

UPDATE:

As pointed by @James Harvey, this code works... :

try{
        std::cout << "Connecting..." << std::endl;
        client->connect (server_name.c_str());

        zmq::message_t request (5);
        memcpy (request.data (), "Hello", 5);
        std::cout << "Sending Hello " << std::endl;
        client->send (request);
}catch(std::exception& e){
        std::cout << "E: connect failed with error " << e.what() << std::endl;    
}

I was thinking that, since the zmqpp is built upon the C bindings, testing for errno or catching the exception was the same. Actually, it is not.


Answer:

In your c++ code are you using the cppzmq bindings? If so you should use try/catch on the connect to see if its failing, the errno is only valid if connect fails.

https://github.com/zeromq/cppzmq/blob/master/zmq.hpp#L603

Question:

I try to implement a REQ/REP pattern, with python3 asyncio and ZeroMQ

My client async function:

import zmq
import os
from time import time
import asyncio

import zmq.asyncio


print ('Client %i'%os.getpid())

context = zmq.asyncio.Context(1)
loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(loop)


async def client():
    socket = context.socket(zmq.REQ)
    socket.connect('tcp://11.111.11.245:5555')
    while True:
        data = zmq.Message(str(os.getpid()).encode('utf8'))
        start = time()
        print('send')
        await socket.send(data)
        print('wait...')
        data = await socket.recv()
        print('recv')
        print(time() - start, data)


loop.run_until_complete(client())

As I understand, the call to a socket.connect( "tcp://11.111.11.245:5555" ) method is a blocking method.

How to make a non-blocking connection call, in my case?


Answer:

As far as I understand the ZeroMQ API, the call to .connect() method is not synchronous with building the real connection ( if not introduced by the wrapper, the underlying API is non-blocking - ref. below ).

The connection will not be performed immediately but as needed by ØMQ. Thus a successful invocation of zmq_connect() does not indicate that a physical connection was or can actually be established.

Ref.: ZeroMQ API - zmq_connect(3)

Question:

I am looking to use ZeroMQ to facilitate IPC in my embedded systems application, however, I'm not able to find many examples on using multiple 0MQ socket types in the same process.

For example, say I have a process called "antenna_mon" that monitors an antenna. I want to be able to send messages to this process and get responses back - a classic REQ-REP pattern. However, I also have a "cm" process, that publishes configuration changes to subscribers. I want antenna_mon to also subscribe to antenna configuration changes - PUB-SUB.

I found this example of reading from multiple sockets in the same process, but it seems sub optimal, because now you no longer block waiting for messages, you inefficiently check for messages constantly and go back to sleep.

Has anyone encountered this problem before? Am I just thinking about it wrong? Maybe I should have two threads - one for CM changes, one for REQ-REP servicing?

I would love any insights or examples of solving this type of problem.


Answer:

Welcome to the very nature of distributed computing!

Yes, there are new perspectives one has to solve, once assembling a Project for a multi-agent domain, where more than one process works and communicates with it's respective peers ad-hoc.

A knowledge base, acquired from a soft Real-Time System or embedded systems design experience will help a lot here. If none such available, some similarities might be also chosen from GUI design, where a centerpiece is something like a lightweight .mainloop() scheduler, and most of the hard-work is embedded into round-robin polled GUI-devices and internal-state changes or external MMI-events are marshalled into event-triggered handlers.

ZeroMQ infrastructure gives one all the tools needed for such non-blocking, controllably poll-able ( scaleable, variable or adaptively ad-hoc adjustable poll-timeouts, not to overcome the given, design defined, round-trip duration of the controller .mainloop() ) and transport-agnostic, asynchronously operated, message dispatcher ( with thread-mapped performance scaling & priority tuning ).

What else one may need?

Well, just imagination and a lot of self-discipline to adhere the Zero-Copy, Zero-Sharing and Zero-Blocking design maxims.

The rest is in your hands.

Many "academic" examples may seem trivial and simplified, so as to illustrate just the currently discussed, or a feature demonstrated in some narrow perspective.

Not so in the real-life situations.

As an example, my distributed ML-engine uses a tandem of several PUSH/PULL pipelines for moving state data updates transfers and prediction forcasts + another PUSH/PULL for remote keyboard + a reversed .bind()/.connect() on PUB/SUB for easy broadcasting of distributed agents' telemetry to a remote centrally operated syslog and some additional PAIR/PAIR pipes, as processing requires.

( nota bene: one shall always bear in mind, that robust and error-resilient systems ought avoid to use a default REQ/REP Scaleable Formal Communication Pattern, as there is non-zero probability of falling the pairwise-stepped REQ/REP dual-FSA into an unsalvageable deadlock. Do not hesitate to read more about this smart tool. )

Question:

I have a system which consists of two applications. Currently, two applications communicate using multiple ZeroMQ PUB/SUB patterns generated for each specific type of transmission. Sockets are programmed in C.

For example, AppX uses a SUB formal-socket archetype for receiving an information struct from AppY and uses another PUB formal-socket archetype for transmitting raw bit blocks to AppY and same applies to AppY. It uses PUB/SUB patterns for transmission and reception.

To be clear AppX and AppY perform the following communications:

AppX -> AppY :- Raw bit blocks of 1 kbits (continous),- integer command (not continuous, depends on user)

AppY -> AppX :Information struct of 10kbits (continuous)


The design target:

a) My goal is to use only one socket at each side for bidirectional communication in nonblocking mode. b) I want two applications to process queued received packets without an excess delay. c) I don't want AppX to crash after a crashed AppY.


Q1: Would it be possible with ZeroMQ? Q2: Can I use ROUTER/DEALER or any other pattern for this job?

I have read the guide but I could not figure out some aspects.

Actually I'm not well experienced with ZeroMQ. I would be pleased to hear about additional tips on this problem.


Answer:

A1: Yes, this is possible in ZeroMQ or nanomsg sort of tools

Both the ZeroMQ and it's younger sister nanomsg share the vision of Scaleable ( which you did not emphasise yet )Formal ( hard-wired formal behaviour )Communication ( yes, it's about this ) Pattern ( that are wisely carved and ready to re-use and combine as needed )

This said, if you prefer to have just one socket-pattern on each "side", then you have to choose such a Formal Pattern, that would leave you all the freedom from any hard-wired behaviour, so as to meet your goal.

So, a) "...only one" is doable -- by a solo of zmq.PAIR (which some parts of documentation flag as a still an experimental device) or NN.BUS or a pair of PUSH/PULL if you step back from allowing just a single one ( which in fact does eliminate all the cool powers of the sharing of the zmq.Context() instantiated IO-thread(s) for re-using the low-level IO-engine. If you spend a few minutes with examples referred to below, you will soon realise, that the very opposite policy is quite common and beneficial to the design targets, when one uses more, even many, patterns in a system architecture.

The a) "...non-blocking" is doable, by stating proper directives zmq.NOBLOCK for respective .send() / .recv() functions and by using fast, non-blocking .poll() loops in your application design architecture.

On b) "...without ... delay" is related to the very noted remark on application design architecture, as you may loose this just by relying on a poor selection and/or not possible tuning of the event-handler's internal timings and latency penalties. If you shape your design carefully, you might remain in a full control of the delay/latency your system will experience and not bacoming a victim of any framework's black-box event-loop, where you can nothing but wait for it's surprises on heavy system or traffic loads.

On c) "... X crash after a Y crashed" is doable on { ZeroMQ | nanomsg }-grounds, by a carefull combination of non-blocking mode of all functions + by your design beeing able to handle exceptions in the situations it does not receive any POS_ACK from the intended { local | remote }-functionality. In this very respect, it is fair to state, that some of the Formal Communication Patters do not have this very flexibility, due to some sort of a mandatory internal behaviour, that is "hard wired" internally, so a due care is to be taken in selecting a proper FCP-archetype for each such still scaleable but fault-resilient role.


Q2: No.

The best next step:

You might feel interested in other ZeroMQ posts here and also do not miss the link to the book, referred there >>>

Question:

I have two threads. One is a Worker Thread, the other a Communication Thread.

The Worker Thread is reading data off a serial port, doing some processing, and then enqueueing the results to be sent to a server.

The Communication Tthread is reading the results off the queue, and sending it. The challenge is that connectivity is wireless, and although usually present, it can be spotty (dropping in and out of range for a few minutes), and I don't want to block Worker Thread if I lose connectivity.

The pattern I have chosen for this, is as follows:

Worker Thread has an enqueue method which adds the message to a Queue, then send a signal to inproc://signal using a zmq.PAIR.

Communication Thread uses zmq.DEALER to communicate to the server (a zmq.ROUTER), but polls the inproc://signal pair in order to register whether there is a new message needing sending or not.

The following is a simplified example of the pattern:

import Queue
import zmq
import time
import threading
import simplejson


class ZmqPattern():
    def __init__(self):
        self.q_out = Queue.Queue()
        self.q_in = Queue.Queue()
        self.signal = None
        self.API_KEY = 'SOMETHINGCOMPLEX'
        self.zmq_comm_thr = None

    def start_zmq_signal(self):
        self.context = zmq.Context()

        # signal socket for waking the zmq thread to send messages to the relay
        self.signal = self.context.socket(zmq.PAIR)
        self.signal.bind("inproc://signal")

    def enqueue(self, msg):
        print("> pre-enqueue")
        self.q_out.put(msg)
        print("< post-enqueue")

        print(") send sig")
        self.signal.send(b"")
        print("( sig sent")

    def communication_thread(self, q_out):
        poll = zmq.Poller()

        self.endpoint_url = 'tcp://' + '127.0.0.1' + ':' + '9001'

        wake = self.context.socket(zmq.PAIR)
        wake.connect("inproc://signal")
        poll.register(wake, zmq.POLLIN)

        self.socket = self.context.socket(zmq.DEALER)
        self.socket.setsockopt(zmq.IDENTITY, self.API_KEY)
        self.socket.connect(self.endpoint_url)
        poll.register(self.socket, zmq.POLLIN)

        while True:
            sockets = dict(poll.poll())

            if self.socket in sockets:
                message = self.socket.recv()
                message = simplejson.loads(message)

                # Incomming messages which need to be handled on the worker thread
                self.q_in.put(message)

            if wake in sockets:
                wake.recv()
                while not q_out.empty():
                    print(">> Popping off Queue")
                    message = q_out.get()
                    print(">>> Popped off Queue")
                    message = simplejson.dumps(message)
                    print("<<< About to be sent")
                    self.socket.send(message)
                    print("<< Sent")

    def start(self):
        self.start_zmq_signal()
        # ZMQ Thread
        self.zmq_comm_thr = threading.Thread(target=self.communication_thread, args=([self.q_out]))
        self.zmq_comm_thr.daemon = True
        self.zmq_comm_thr.name = "ZMQ Thread"
        self.zmq_comm_thr.start()


if __name__ == '__main__':
    test = ZmqPattern()
    test.start()

    print '###############################################'
    print '############## Starting comms #################'
    print "###############################################"

    last_debug = time.time()
    test_msg = {}
    for c in xrange(1000):
        key = 'something{}'.format(c)
        val = 'important{}'.format(c)
        test_msg[key] = val

    while True:
        test.enqueue(test_msg)
        if time.time() - last_debug > 1:
            last_debug = time.time()
            print "Still alive..."

If you run this, you'll see the dealer blocks as there is no router on the other end, and shortly after, the pair blocks as the Communication Thread isn't receiving

How should I best set up the inproc zmq to not block Worker Thread.

FYI, the most the entire system would need to buffer is in the order of 200k messages, and each message is around 256 bytes.


Answer:

The dealer socket has a limit on the number of messages it will store, called the high water mark. Right below your dealer socket creation, try:

    self.socket = self.context.socket(zmq.DEALER)
    self.socket.setsockopt(zmq.SNDHWM, 200000)

And set that number as high as you dare; the limit is your machine's memory.

EDIT:

Some good discussion of high water marks in this question:

Majordomo broker: handling large number of connections

Question:

I am using ZeroMQ, which is a messaging library (presumably async I/O), if you don't know what it is you can think of it as similar to socket library in python, the sockets used for messaging are usually run within an infinite while loop with a small sleep for keep everything cool.

I have the code written in a separate file and I have a GUI based on the working of that code separate, I want to integrate the two codes.

But the issue I come across is that I can not possibly put a while True, or a blocking socket.recv() inside tkinter's .mainloop().

I want to receive on on a socket, which is blocking - BUT I can manage that part of the issue, zmq sockets can either be polled on (check periodically to see if we have any pending messages to process) or equivalently you can use zmq.DONTWAIT which does the same thing.

The issue remaining however is that I need a while True, so that the socket is constantly polled, say every millisecond to see if we have messages.

How do I put a while True inside the tkinter .mainloop() that allows me to check the state of that socket constantly?

I would visualize something like this :

while True:    
    update_gui()       # contains the mainloop and all GUI code
    check_socket()     # listener socket for incoming traffic
    if work:
        #              # do a work, while GUI will hang for a bit.

I have checked the internet, and came across solution on SO, which says that you can use the After property of widgets but I am not sure how that works. If someone could help me out I would be super grateful !

Code for reference :

zmq.DONTWAIT throws an exception if you do not have any pending messages which makes us move forward in the loop.

while 1:
    if socket_listen and int(share_state):
        try:
            msg =  socket_listen.recv_string(zmq.DONTWAIT)
        except:
            pass

    time.sleep(0.01) 

I would like that I could put this inside the .mainloop() and along with the GUI this also gets checked every iteration.

Additional info : Polling here equates to :

  • check if we have messages on socket1
  • if not then proceed normally
  • else do work.

Answer:

How do I put a while True inside the tkinter .mainloop() that allows me to check the state of that socket constantly?

Do not design such part using an explicit while True-loop, better use the tkinter-native tooling: asking .after() to re-submit the call not later than a certain amount of time ( let for other things to happen concurrently, yet having a reasonable amount of certainty, your requested call will still be activated no later than "after" specified amount of milliseconds ).


I love Tkinter architecture of co-existing event processing

So if one keeps the Finite-State-Automata ( a game, or a GUI front-end ) clean crafted on the Tkinter-grounds, one can enjoy delivering ZeroMQ-messages data being coordinated "behind" the scene, right by Tkinter-native tools, so no imperative-code will be needed whatsoever. Just let the messages get translated into tkinter-monitored-variables, if you need to have indeed smart-working GUI integration.

aScheduledTaskID = aGuiRelatedOBJECT.after( msecs_to_wait,
                                            aFunc_to_call = None,
                                            *args
                                            )
# -> <_ID_#_>
# ... guarantees a given wait-time + just a one, soloist-call
#     after a delay of at least delay_ms milliseconds.
#     There is no upper limit to how long it will actually take, but
#     your callback-FUN will be called NO SOONER than you requested,
#     and it will be called only once.
#     aFunc_to_call() may "renew" with .after()
#
# .after_cancel( aScheduledTaskID )  # <- <id> CANCELLED from SCHEDULER
#
# .after_idle() ~ SCHEDULE A TASK TO BE CALLED UPON .mainloop() TURNED-IDLE
#
#     aScheduledTaskOnIdleID = aGuiOBJECT.after_idle( aFunc_to_call = None,
#                                                     *args
#                                                     )
# -> <_ID_#_>

That's cool on using the ready-to-reuse tkinter native-infrastructure scheduler tools in a smart way, isn't it?


Epilogue:

( Blocking calls? Better never use blocking calls at all. Have anyone ever said blocking calls here? :o) )


a while True, or a blocking socket.recv() inside tkinter's .mainloop().

well, one can put such a loop into a component aligned with tkinter native-infrastructure scheduler, yet this idea is actually an antipattern and can turn things into wreck havoc ( not only for tkinter, in general in any event-loop handler it is a bit risky to expect any "competitive" event-handler loop to somehow tolerate or behave in a peacefull the co-existence of adjacent intentions - problems will appear ( be it from a straight blocking or due to one being a just too much dominant in scheduling resources or other sorts of a war on time and resources ) ).

Question:

I am using ConversationHandler to manage my boot's states. I also have some timers there, so i am using JobQueue that for. Also i have web application where user can perform some tasks, after tasks complete i whant it (site) to send notification to user from bot. To interconnect between user and telegram bot i decided to use ZeroMQ (using this example https://gist.github.com/ramn/7061042) But, i need to run listener somehow async in my bot's code, otherwise it didn't event start to execute due bot's start_polling function. I did try to do it using JobQueue and it worked great, but it stops all the JobQueue queue =( How can i manage this interconnection? Should i stop using JobQueue and move to some celery-like library?

Thanks!


Answer:

I am not sure if this would completely help, but python-telegram-bot has a @run_async decorator which provides you extra threads for async functions. You can do from telegram.ext.dispatcher import run_async and then decorate the job function.

More details can be found in the wiki here.

Question:

I'm writing a small test app using ZeroMQ.

One test scenario I have is when there is no server running to connect to.

So, I'm passing ZMQ_DONTWAIT to zmq_recv() in that scenario expecting an error of EAGAIN but instead getting errno value of 0.

Sample client code below:

int rc;

void *context = zmq_ctx_new();
void *requester = zmq_socket(context, ZMQ_REQ);
int nLingerOption = 0;
rc = zmq_setsockopt(requester, ZMQ_LINGER, &nLingerOption, sizeof(nLingerOption));
rc = zmq_connect(requester, "tcp://127.0.0.1:5555");

int nSendLen = zmq_send(requester, "M", 1, 0);

char buffer[1000];
int nRecvLen = zmq_recv(requester, buffer, sizeof(buffer)-1, ZMQ_DONTWAIT);

if( nRecvLen < 0 )
    printf("errno = %d\n", errno);

Why would the output be 0 instead of EAGAIN (defined as 11 on my system).

EDIT: This is running ZeroMQ version 4.1


Answer:

The answer is hiding in your windows tag (thanks for including that). Relevant: http://api.zeromq.org/4-1:zmq-errno

Specifically:

The zmq_errno() function is provided to assist users on non-POSIX systems who are experiencing issues with retrieving the correct value of errno directly. Specifically, users on Win32 systems whose application is using a different C run-time library from the C run-time library in use by ØMQ will need to use zmq_errno() for correct operation.

You should be using zmq_errno() as opposed to accessing errno directly.

Question:

I have a node application handling some ZeroMQ events coming from another application utilizing the Node-ZMQ bindings found here: https://github.com/JustinTulloss/zeromq.node

The issue I am running into is one of the operations from an event takes a long time to process and this appears to be blocking any other event from being processed during this time. Although the application is not currently clustered, doing so would only afford a few more threads and doesn't really solve the issue. I am wondering if there is a way of allowing for these async calls to not block other incoming requests while they process, and how I might go about implementing them.

Here is a highly condensed/contrived code example of what I am doing currently:

var zmq = require('zmq');
var zmqResponder = zmq.socket('rep');
var Client = require('node-rest-client').Client;
var client = new Client();

zmqResponder.on('message', function (msg, data) {
  var parsed = JSON.parse(msg);
  logging.info('ZMQ Request received: ' + parsed.event);
  switch (parsed.event) {
    case 'create':
        //Typically short running process, not an issue
    case 'update':
        //Long running process this is the issue
        serverRequest().then(function(response){
            zmqResponder.send(JSON.stringify(response));
        });
     }

});

function serverRequest(){
    var deferred = Q.defer();
      client.get(function (data, response) {
      if (response.statusCode !== 200) {
        deferred.reject(data.data);
      } else {
        deferred.resolve(data.data);
      }
  });
    return deferred.promise;
}

EDIT** Here's a gist of the code: https://gist.github.com/battlecow/cd0c2233e9f197ec0049


Answer:

I think, through the comment thread, I've identified your issue. REQ/REP has a strict synchronous message order guarantee... You must receive-send-receive-send-etc. REQ must start with send and REP must start with receive. So, you're only processing one message at a time because the socket types you've chosen enforce that.

If you were using a different, non-event-driven language, you'd likely get an error telling you what you'd done wrong when you tried to send or receive twice in a row, but node lets you do it and just queues the subsequent messages until it's their turn in the message order.

You want to change REQ/REP to DEALER/ROUTER and it'll work the way you expect. You'll have to change your logic slightly for the ROUTER socket to get it to send appropriately, but everything else should work the same.


Rough example code, using the relevant portions of the posted gist:

var zmqResponder = zmq.socket('router');

zmqResponder.on('message', function (msg, data) {
    var peer_id = msg[0];
    var parsed = JSON.parse(msg[1]);
    switch (parsed.event) {
        case 'create':
            // build parsedResponse, then...
            zmqResponder.send([peer_id, JSON.stringify(parsedResponse)]);
            break;
    }
});

zmqResponder.bind('tcp://*:5668', function (err) {
    if (err) {
        logging.error(err);
    } else {
        logging.info("ZMQ awaiting orders on port 5668");
    }
});

... you need to grab the peer_id (or whatever you want to call it, in ZMQ nomenclature it's the socket ID of the socket you're sending from, think of it as an "address" of sorts) from the first frame of the message you receive, and then use send it as the first frame of the message you send back.

By the way, I just noticed in your gist you are both connect()-ing and bind()-ing on the same socket (zmq.js lines 52 & 143, respectively). Don't do that. Inferring from other clues, you just want to bind() on this side of the process.

Question:

I have the following pattern :

Multiples threads are sending messages to a ConcurrentQueue that is polled by a single threaded Dealer in order to send messages to a Router.

The following exception is raised when multiples messages are sent :

"SocketException - A non-blocking socket operation could not be completed"

Here is the code of the thread that dequeues messages and send it to the dealer :

Task.Factory.StartNew((state) =>
        {
            using (NetMQSocket dealerSocket = new DealerSocket(_connectionString))
            using (NetMQPoller poller = new NetMQPoller() { dealerSocket })
            {
                dealerSocket.ReceiveReady += DealerSocketOnReceiveReady;
                poller.RunAsync();

                while (true)
                {
                    Message<T> message;
                    if (!_concurrentQueue.TryDequeue(out message)) continue;

                    _pendingRequests.Add(message.Id, message);
                    var mpm = new NetMQMessage(4);
                    mpm.AppendEmptyFrame();
                    mpm.Append(message.Body);
                    mpm.AppendEmptyFrame();
                    mpm.Append(message.Id.ToString());
                    dealerSocket.SendMultipartMessage(mpm);
                }
            }
        }, TaskCreationOptions.LongRunning, _cancellationTokenSource.Token);

The SocketException occured when sending the MultipartMessage

I have tried to increase the SendBuffer size and/or the SendHighWatermark size but I still have the same error.

Do I need to handle this exception and reset the socket or I should never be in this case?


Answer:

The dealerSocket was used in 2 threads : main & poller. In order to use the dealerSocket in only one thread we used a NetMQQueue.

More details here