Hot questions for Using ZeroMQ in haskell

Question:

I have a small piece of code that receives frames on a zeromq Pull socket and displays it in a opencv window:

module Main where

import           Control.Monad
import qualified OpenCV as CV
import           System.ZMQ4.Monadic
import           System.Exit

main :: IO()
main = runZMQ $ do
  receiver <- socket Pull
  bind receiver "tcp://*:5554"

  -- do some stuff not relevant

  forever $ do
    buffer <- receive receiver
    let img = CV.imdecode CV.ImreadUnchanged buffer -- simple decoder
    liftIO $ CV.withWindow "Video" $ \window -> do
        CV.imshow window img
        key <- CV.waitKey 10
        when (key == 27) exitSuccess -- <- UGLY!

What I would like to find is a way to break the loop that allows me more control. I'm aware of the EitherT solution proposed by Gabriel Gonzalez here (that I like very much) but I'm not able to implement it in the CV.withWindow context, for example:

quit :: (Monad m) => e -> EitherT e m r
quit = left

loop :: (Monad m) => EitherT e m a -> m e
loop = fmap (either id id) . runEitherT . forever

main :: IO()
main = runZMQ $ do
  receiver <- socket Pull
  bind receiver "tcp://*:5554"

  loop $ do
    buffer <- receive receiver
    let img = CV.imdecode CV.ImreadUnchanged buffer -- simple decoder
    liftIO $ CV.withWindow "Video" $ \window -> do
        CV.imshow window img
        key <- CV.waitKey 10
        when (key == 27) $ quit ()

But of course quit wraps the argument in a Left and this solution doesn't compile.


Answer:

Read and write an IORef, and use whileM_.

main = runZMQ $ do
    receiver <- socket Pull
    bind receiver "tcp://*:5554"
    continue <- liftIO $ newIORef True

    whileM_ (liftIO $ readIORef continue) $ do
        buffer <- receive receiver
        let img = CV.imdecode CV.ImreadUnchanged buffer -- simple decoder
        liftIO . CV.withWindow "Video" $ \window -> do
            CV.imshow window img
            key <- CV.waitKey 10
            when (key == 27) $ writeIORef continue False

Or have your loop call itself explicitly as appropriate:

main = runZMQ $ do
    receiver <- socket Pull
    bind receiver "tcp://*:5554"

    let loop = do
            buffer <- receive receiver
            let img = CV.imdecode CV.ImreadUnchanged buffer -- simple decoder
            key <- liftIO . CV.withWindow "Video" $ \window -> do
                CV.imshow window img
                CV.waitKey 10
            when (key /= 27) loop

    loop

Question:

ZeroMQ: Messaging for Many Applications comments on REQ -> ROUTER communication:

If we rewrote our “Hello World” server using ROUTER, we’d be able to process any number of “Hello” requests in parallel.

So I typed out hwclient.hs and have slightly modified the hwserver.hs from the ZeroMQ Guide:

Hello World Client
{-# LANGUAGE OverloadedStrings #-}

-- Hello World client

module Main where

import Control.Monad
import System.ZMQ4.Monadic

main :: IO ()
main = runZMQ $ do
    liftIO $ putStrLn "Connecting to hello world server…"

    requester <- socket Req
    connect requester "tcp://localhost:5555"

    forM_ [1..10] $ \i -> do
        liftIO . putStrLn $ "Sending Hello " ++ show i ++ "…"
        send requester [] "Hello"
        _ <- receive requester
        liftIO . putStrLn $ "Received World " ++ show i

And the Router:

Hello World Router
{-# LANGUAGE OverloadedStrings #-}

-- Hello World server

module Main where

import Control.Concurrent
import Control.Monad
import System.ZMQ4.Monadic

main :: IO ()
main = runZMQ $ do
    -- Socket to talk to clients
    responder <- socket Router
    bind responder "tcp://*:5555"

    forever $ do
        buffer <- receive responder
        liftIO $ do
            putStrLn "Received Hello"
            threadDelay 1000000 -- Do some 'work'
        send responder [] "World"

But, when I run it, I see the client send a message, but never receive a response.

$stack exec -- client-exe
Connecting to hello world server…
Sending Hello 1…

In my other window, I see the server receives 3 messages and nothing more:

$stack exec -- server-exe
Received Hello
Received Hello
Received Hello

Q1: Why does the client never receive a reply? Q2: And why does the server (ROUTER) receive 3 messages rather than 1?

EDIT

I updated hwserver.hs to print out the messages that it receives rather than simply "Hello".

Posting the diff:

printf "received request: %s\n" . unpack $ buffer

replaced

putStrLn "Received Hello"

And the server (Router) showed:

$stack exec -- server-exe
received request: A§
received request: 
received request: Hello

Lastly, here's the send's function signature:

λ: :t send
send
  :: Sender t =>
     Socket z t
     -> [Flag] -> Data.ByteString.Internal.ByteString -> ZMQ z ()

Answer:

Answering in reverse order...

Your 3 'messages' are the 3 frames of the recieved message. The first frame is added by the Router socket to identify where the message came from. The 2nd is an empty delimiter frame added by the Requester socket and the final frame contains the data you sent.

This leads on to why you don't get a response, when a dealer socket sends a message it strips off the first message frame and uses it as an address to identify the client it needs to reply to. As you aren't saving and appending the first frame received to the outgoing message then it'll just be dropped.

Sadly I don't know enough about the haskell or its zeromq bindings to give you a code solution, but if you receive into an address buffer until you read an empty packet, then read the data packet into the data buffer. When replying send the address buffer, an empty buffer and then the reply.

Question:

I am currently in the process of implementing a communication pipeline between several processes using ZeroMQ, all using the Push/Pull mechanism. The pipeline starts with a 'ventilator' that generates tasks, and here is where my problem also starts: ZeroMQ seems to be using 100% CPU load when no workers are connected.

Here is the code in question, which attempts to send just one message:

module Main where

import System.ZMQ4.Monadic
import Data.ByteString.Char8 (pack)

main :: IO ()
main = do
     runZMQ $ do
            publisher <- socket Push
            bind publisher "tcp://*:10150"

            send publisher [] (pack "foo")

            close publisher

As you can see, this code is extremely simple and just attempts to send the message "foo" to any subscriber. I would expect this code to queue this message in the background, but instead it appears to get into a never-ending loop with the send command. Setting a high water mark on the socket has no effect.

There is an example in the zguide that is similar to what I'm trying to achieve: https://github.com/imatix/zguide/blob/master/examples/Haskell/taskvent.hs

In this example, they explicitly request user input to start sending (specifically, 'press enter when workers are ready') -- is this the way they work around this problem?

Can anyone enlighten me what I'm doing wrong here, or what the best approach to solve this problem is?

EDIT

To elaborate, the following program (with a connected listener) works perfectly:

module Main where

import System.ZMQ4.Monadic
import Data.ByteString.Char8 (pack, unpack)
import Control.Applicative ((<$>))

main :: IO ()
main = do
     runZMQ $ do
            publisher <- socket Push
            receiver  <- socket Pull

            bind    publisher "tcp://*:10150"
            connect receiver "tcp://127.0.0.1:10150"

            send publisher [] (pack "foo")    
            message <- unpack <$> receive receiver

            liftIO $ putStrLn ("received data: " ++ message)

This prints out the received data as expected.

EDIT 2

Using strace, I was able to decipher that apparently zeromq is in a poll/select infinite loop:

poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
--- SIGVTALRM {si_signo=SIGVTALRM, si_code=SI_TIMER, si_pid=0, si_uid=0, si_value=0} ---
rt_sigreturn()                          = 1
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
poll([{fd=8, events=POLLIN}], 1, 0)     = 0 (Timeout)
select(9, [], [8], NULL, NULL)          = 1 (out [8])
--- SIGVTALRM {si_signo=SIGVTALRM, si_code=SI_TIMER, si_pid=0, si_uid=0, si_value=0} ---
rt_sigreturn()                          = 1

and this pattern repeats itself endlessly.


Answer:

ZeroMQ architecture is not just-another-socket-wrapper

The first thing, Pieter Hintjens, the co-father of ZeroMQ series, recommends is to forget everything you might have used while handling sockets so far.

That has many reasons.

The first of which is, that ZeroMQ makes for you a pair of new, abstract worlds, that you should understand "in-principle" to rather live in peace with than to struggle against - both a micro-COSMOS ( the internal mechanics, one shall somehow respect and live in accord with ) and a MACRO-cosmos, that constructs a very powerful set of Scaleable-Formal-Communication-Patterns, one may further harness and integrate into higher order distributed processing systems.

So?

Due to micro-COSMOS, you could opt to benefit from a low-latency / high-performance practice to rather initiate ZMQ Context instance somewhere very early in the code and similarly assign / setup / bind / connect the wanted ZMQ-primitive elements' instances ( sockets of a feasible ZMQ-primitive archetype ... PUSH/PULL given in your case ), than to setup/close/setup/close resources ad-hoc, the less in an endless loop.

ZeroMQ instances are not disposables, but rather assets for the system. Rethink the architecture and both the micro-COSMOS and MACRO-cosmos will deliver an immense work for you.

Next

Due to MACRO-cosmos rules, the PUSH / PULL (distributed) Formal-Communication-Pattern assumes, that a message remains inside the PUSH-er's (internal) queue, until a PULL-er ( invisibly and out of the opponent's code-control ) handshakes and retrieves it.

That also means, that if your code attempts to

main :: IO ()
main = do
     runZMQ $ do
            -- PUSH side
            -- ^... beware the ZMQ uses a keyword PUB ( publish )
            --      BUT in a different pattern, very different pattern
            ...
            close publisher

the code bumps into a micro-COSMOS internality, where a default value for a ZMQ_LINGER ( default == -1 ) parameter of a ZMQ-socket causes the attempt to close to wait infinitely, until any other process retrieves any and all the un-consumed messages already en-queued inside the PUSH-er.

That alone explains both the initial objection and the ad-hoc observation in EDIT1, that in case of a "connected" PULL-er the endless ZMQ_LINGER waiting loop does not appear. Also the EDIT2 just visualises what an ommission to do so causes in the low-level {select|poll} looping. Q.E.D.

Final Note

Definitely worth a few days time and efforts to read through both of the Pieter Hintjens' books on ZeroMQ.

Ultimate heap of gems and best practices there.

Much better than to strike a few one-liners in a trial-error loop

( As you have made the code bang into a principal dead-end, the "problem" is rather in your code, not in the ZMQ-haskel binding )