Hot questions for Using ZeroMQ in elixir

Question:

In Python I have the option of using a "poller" object which polls blocking sockets for messages waiting and unblocks after a specified number of milliseconds (in the case below, 1000, in the while True block):

import zmq

# now open up all the sockets
context = zmq.Context()
outsub = context.socket(zmq.SUB)
outsub.bind("tcp://" + myip + ":" + str(args.outsubport))
outsub.setsockopt(zmq.SUBSCRIBE, b"")
inreq = context.socket(zmq.ROUTER)  
inreq.bind("tcp://" + myip + ":" + str(args.inreqport))
outref = context.socket(zmq.ROUTER)  
outref.bind("tcp://" + myip + ":" + str(args.outrefport))
req = context.socket(zmq.ROUTER)  
req.bind("tcp://" + myip + ":" + str(args.reqport))
repub = context.socket(zmq.PUB)  
repub.bind("tcp://" + myip + ":" + str(args.repubport))

# sort out the poller
poller = zmq.Poller() 
poller.register(inreq, zmq.POLLIN)
poller.register(outsub, zmq.POLLIN)
poller.register(outref, zmq.POLLIN)
poller.register(req, zmq.POLLIN)

# UDP socket setup for broadcasting this server's address 
cs = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
cs.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

# housekeeping variables
pulsecheck = datetime.utcnow() + timedelta(seconds = 1)
alivelist = dict()
pulsetimeout = 5

while True: 
    polls = dict(poller.poll(1000))
    if inreq in polls:
        msg = inreq.recv_multipart()
        if msg[1] == b"pulse":           # handle pluse
            ansi("cyan", False, textout = " pulse" + "-" + msg[0].decode())
            if not msg[0] in alivelist.keys():
                handlechange(msg[0])
            alivelist[msg[0]] = datetime.utcnow() + timedelta(seconds = pulsetimeout)
    if outsub in polls:
        msgin = outsub.recv_multipart()[0]
        repub.send(msgin) # republish
        msg = unpacker(msgin)
        if isinstance(msg, dict):
            valu = msg.get("value")
            print(".", end = "", flush = True)
        else:
            ansi("green", False, textout = msg)

    if req in polls:
        msg = req.recv_multipart()
        valmsg = validate_request(msg)
        if not valmsg[0]:
            ansi("red", True); print(valmsg[1]); ansi()
        elif len(alivelist) > 0:
            targetnode = random.choice(list(alivelist.keys()))
            inreq.send_multipart([targetnode, packer(valmsg[1])])
            ansi("blue", True, textout = "sent to " + targetnode.decode())
        else:
            ansi("red", True, textout = "NO CONNECTED NODES TO SEND REQUEST TO")
    if outref in polls:
        msg = outref.recv_multipart()
        destinataire, correlid = msg[1].split(b"/")
        req.send_multipart([destinataire, correlid, msg[2]])

I want to implement something analogous in Elixir (or Erlang) but my preferred native library, chumak, doesn't seem to implement polling. How do I implement non-blocking receives in Erlang/Elixir, preferably using Chumak, but I'll move to another Erlang zeroMQ library if necessary? My socket pattern preference is router sends, dealer receives.

EDIT

My use case is the following. I have a third party financial service which serves data based on requests, with answers coming asynchronously. So you can send multiple requests, and you'll get responses back after an unspecified period of time, and not necessarily in the same order you sent them.

So I need to connect this service into Erlang (actually Elixir) and ZeroMQ seems like a good fit. Multiple users connected (via Phoenix) to Erlang/Elixir will send requests, and I need to pass these on to this service.

The problem comes if there is an error in one of the requests, or the third party service has some kind of problem. I will be blocking-waiting for a response, and then unable to service new requests from Phoenix.

Basically I want to listen constantly for new requests, send them over, but if one request doesn't produce a response, I will have one-fewer responses than requests and that will lead to an eternal wait.

I understand that if I send requests separately, then the good ones will produce responses so I don't need to worry about blocking even if, over time, I get quite a big numerical difference between requests sent and responses received. Maybe the design idea is that I shouldn't worry about this? Or should I try to track one-for-one responses to requests and timeout the non-responses somehow? Is this a valid design pattern?


Answer:

Is your system constantly connected to the asynchronous query resource, or are you making a new connection with each query?

Each situation has its own natural model in Erlang.

The case of: A single (or pool of) long-term connection(s)

Long-term connections that maintain a session with the resource (the way a connection with a database would work) are most naturally modelled as processes within your system that have the sole job of representing that external resource.

The requirements of that process are:

  • Translate the external resource's messages into internally meaningful messages (not just passing junk through -- don't let raw, external data invade your system unless it is totally opaque to you)
  • Keep track of timed out requests (and this may require something sort of like polling, but can be done more precisely with erlang:send_after/3

This implies, of course, that the module that implements this process will need to speak the protocol of that resource. But if this is accomplished then there really isn't any need for a messaging broker like an MQ application.

This allows you to have that process be reactive and block on receive while the rest of your program goes off to do whatever its doing to do. Without some arbitrary polling that will surely run you into the Evil Black Swamp of Scheduling Issues.

The case of: A new connection per query

If each query to the resource requires a new connection the model is similar, but in here you spawn a new process per query and it represents the query itself within your system. It blocks waiting for the response (on a timeout), and nothing else matters to it.

That is the easier model, actually, because then you don't have to scrub a list of past, possibly timed out requests that will never return, don't have to interact with a set of staged timeout messages sent via erlang:send_after/3, and you move your abstraction one step closer to the actual model of your problem.

You don't know when these queries will return, and that causes some potential confusion -- so modeling each actual query as a living thing is an optimal way to cut through the logical clutter.

Either way, model the problem naturally: As a concurrent, asynch system

In no case, however, do you want to actually do polling the way you would in Python or C or whatever. This is a concurrent problem, so modelling it as such will provide you a lot more logical freedom and is more likely to result in a correct solution that lacks corners that give rise to weird cases.

Question:

I want to communicate between Elixir and Python. I don't want to use NIFs and stuff - I prefer loosely coupled using zeroMQ as this will allow me to use other languages than Python later. I am using the chumak library which is a native implementation of zeromq in Erlang, and seems well maintained. I have used it successfully in the past for pub sub.

Apart from pub-sub, I'm finding that req-rep and req-router sockets work fine. However dealer-router does not. This is really important because only dealer and router give you true async in zeromq.

Here is the python code for the router side:

import zmq
context = zmq.Context()
rout = context.socket(zmq.ROUTER)
rout.bind("tcp://192.168.1.192:8760")

Here is the Elixir req code which works fine...

iex(1)> {ok, sock1} = :chumak.socket(:req, 'reqid')
{:ok, #PID<0.162.0>}
iex(2)> {ok, _peer} = :chumak.connect(sock1, :tcp, '192.168.1.192', 8760)
{:ok, #PID<0.164.0>}
iex(3)> :chumak.send(sock1, 'hello from req socket')
:ok

.... because I get it on the Python side:

In [5]: xx = rout.recv_multipart()
In [6]: xx
Out[6]: ['reqid', '', 'hello from req socket']

However, here is what I get if I try a dealer socket on the Elixir side:

iex(4)> {ok, sock2} = :chumak.socket(:dealer, 'dealid')                  
{:ok, #PID<0.170.0>}
iex(5)> {ok, _peer} = :chumak.connect(sock2, :tcp, '192.168.1.192', 8760)
{:ok, #PID<0.172.0>}
iex(6)> :chumak.send(sock2, 'hello from dealer socket')
{:error, :not_implemented_yet}
iex(7)> :chumak.send_multipart(sock2, ['a', 'b', 'hello from dealer socket'])

22:13:38.705 [error] GenServer #PID<0.172.0> terminating
** (FunctionClauseError) no function clause matching in :chumak_protocol.encode_more_message/3
    (chumak) /home/tbrowne/code/elixir/chutest/deps/chumak/src/chumak_protocol.erl:676: :chumak_protocol.encode_more_message('a', :null, %{})
    (stdlib) lists.erl:1354: :lists.mapfoldl/3
    (chumak) /home/tbrowne/code/elixir/chutest/deps/chumak/src/chumak_protocol.erl:664: :chumak_protocol.encode_message_multipart/3
    (chumak) /home/tbrowne/code/elixir/chutest/deps/chumak/src/chumak_peer.erl:159: :chumak_peer.handle_cast/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:send, ['a', 'b', 'hello from dealer socket'], {#PID<0.160.0>, #Reference<0.79795089.2401763329.172383>}}}
State: {:state, :ready, '192.168.1.192', 8760, :client, [], :dealer, 'dealid', [], {3, 0}, #Port<0.4968>, {:decoder, :ready, 0, nil, nil, {:some, 3}, {:some, 0}, %{}, :null, false}, #PID<0.170.0>, {[], []}, [], false, false, false, :null, %{}}

22:13:38.710 [info]  [:unhandled_handle_info, {:module, :chumak_socket}, {:msg, {:EXIT, #PID<0.172.0>, {:function_clause, [{:chumak_protocol, :encode_more_message, ['a', :null, %{}], [file: '/home/tbrowne/code/elixir/chutest/deps/chumak/src/chumak_protocol.erl', line: 676]}, {:lists, :mapfoldl, 3, [file: 'lists.erl', line: 1354]}, {:chumak_protocol, :encode_message_multipart, 3, [file: '/home/tbrowne/code/elixir/chutest/deps/chumak/src/chumak_protocol.erl', line: 664]}, {:chumak_peer, :handle_cast, 2, [file: '/home/tbrowne/code/elixir/chutest/deps/chumak/src/chumak_peer.erl', line: 159]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 616]}, {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 686]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}}]

As you can see I get this huge error on the :chumak.send_multipart, while :chumak.send doesn't even work. What's going on here?

The dealer socket works fine by the way from the Python side:

import zmq
context = zmq.Context()
deal = context.socket(zmq.DEALER)
deal.setsockopt_string(zmq.IDENTITY, u"Thomas")
deal.connect("tcp://192.168.1.192:8760")
deal.send("hello from python deal")

Now on router side:

In [5]: xx = rout.recv_multipart()
In [6]: xx
Out[6]: ['reqid', '', 'hello from req socket']
In [7]: dd = rout.recv_multipart()
In [8]: dd
Out[8]: ['Thomas', 'hello from python deal']

So I'm wondering if I have a syntax, or type error, in my Elixir chumak dealer socket, or if it's simply a bug. I have tried this on both amd64 and armv7l architectures and the problem is identical.

All the elixir code is based on the Erlang version in the chumak example for dealer-router.

My mix.exs deps looks like this:

 [
      {:chumak, "~> 1.2"},
      {:msgpack, "~> 0.7.0"}

 ]

Answer:

the only obvious thing I see is your use of send_multipart. Its signature in the source:

-spec send_multipart(SocketPid::pid(), [Data::binary()]) -> ok.

you are doing this:

:chumak.send_multipart(sock2, ['a', 'b', 'hello from dealer socket'])

------------
iex(2)> is_binary('a')
false
iex(3)> is_binary('hello from dealer socket')
false

Otherwise, I can not see much of a difference between your code and the example code that is in chumak's repo.

Question:

I'm trying to install IElixir Jupyter notebook kernel on Xubuntu 16.04. I've successfully done it before on another machine with Xubuntu 16.04.

I have the latest erlang otp 19.3 and elixir 1.4.4 (installed using kiex).

After executing mix test i get an error about undefined :erlzmq_nif.context/1 function:

** (Mix) Could not start application ielixir: exited in: IElixir.start(:normal, [])
    ** (EXIT) an exception was raised:
        ** (UndefinedFunctionError) function :erlzmq_nif.context/1 is undefined (module :erlzmq_nif is not available)
            :erlzmq_nif.context(1)
            (ielixir) lib/ielixir.ex:13: IElixir.start/2
            (kernel) application_master.erl:273: :application_master.start_it_old/4

The thing is all the dependencies are succesfully installed and I am able to use :erlzmq_nif.context/1 in iex:

iex(1)> :erlzmq_nif.context 1
{:ok, ""}
iex(2)> 

I've copied my build of erlzmq to /usr/lib/erlang/lib so that I can use it in erlang too:

1> erlzmq_nif:context(3).
{ok,<<>>}
2> 

Why is this lib not visible to elixir when using mix?


Answer:

I've not solved the problem, bu I've manage to get IElixir to work.

What I've done was replace :erlzmq dependency in mix.exs with github: "zeromq/erlzmq" (notice no 2 in the repo name) which might not outperform erlzmq2 as it's not NIF based, but at least it works without problems.