Hot questions for Using ZeroMQ in concurrency

Question:

Following are my Celluloid codes.

  1. client1.rb One of the 2 clients. (I named it as client 1)

  2. client2.rb 2nd of the 2 clients. (named as client 2 )

Note:

the only the difference between the above 2 clients is the text that is passed to the server. i.e ('client-1' and 'client-2' respectively)

On testing this 2 clients (by running them side by side) against following 2 servers (one at time). I found very strange results.

  1. server1.rb (a basic example taken from the README.md of the celluloid-zmq)

    Using this as the example server for the 2 above clients resulted in parallel executions of tasks.

OUTPUT

ruby server1.rb

Received at 04:59:39 PM and message is client-1
Going to sleep now
Received at 04:59:52 PM and message is client-2
Note:

the client2.rb message was processed when client1.rb request was on sleep.(mark of parallelism)

  1. server2.rb

    Using this as the example server for the 2 above clients did not resulted in parallel executions of tasks.

OUTPUT

ruby server2.rb

Received at 04:55:52 PM and message is client-1
Going to sleep now
Received at 04:56:52 PM and message is client-2
Note:

the client-2 was ask to wait 60 seconds since client-1 was sleeping(60 seconds sleep)

I ran the above test multiple times all resulted in same behaviour.

Can anyone explain me from the results of the above tests that.

Question: Why is celluloid made to wait for 60 seconds before it can process the other request i.e as noticed in server2.rb case.?

Ruby version

ruby -v

ruby 2.1.2p95 (2014-05-08 revision 45877) [x86_64-darwin13.0]


Answer:

Using your gists, I verified this issue can be reproduced in MRI 2.2.1 as well as jRuby 1.7.21 and Rubinius 2.5.8 ... The difference between server1.rb and server2.rb is the use of the DisplayMessage and message class method in the latter.


Use of sleep in DisplayMessage is out of Celluloid scope.

When sleep is used in server1.rb it is using Celluloid.sleep in actuality, but when used in server2.rb it is using Kernel.sleep ... which locks up the mailbox for Server until 60 seconds have passed. This prevents future method calls on that actor to be processed until the mailbox is processing messages ( method calls on the actor ) again.

There are three ways to resolve this:
  • Use a defer {} or future {} block.

  • Explicitly invoke Celluloid.sleep rather than sleep ( if not explicitly invoked as Celluloid.sleep, using sleep will end up calling Kernel.sleep since DisplayMessage does not include Celluloid like Server does )

  • Bring the contents of DisplayMessage.message into handle_message as in server1.rb; or at least into Server, which is in Celluloid scope, and will use the correct sleep.


The defer {} approach:
def handle_message(message)
  defer {
    DisplayMessage.message(message)
  }
end
The Celluloid.sleep approach:
class DisplayMessage
    def self.message(message)
      #de ...
      Celluloid.sleep 60
    end
end

Not truly a scope issue; it's about asynchrony.

To reiterate, the deeper issue is not the scope of sleep ... that's why defer and future are my best recommendation. But to post something here that came out in my comments:

Using defer or future pushes a task that would cause an actor to become tied up into another thread. If you use future, you can get the return value once the task is done, if you use defer you can fire & forget.

But better yet, create another actor for tasks that tend to get tied up, and even pool that other actor... if defer or future don't work for you.

I'd be more than happy to answer follow-up questions brought up by this question; we have a very active mailing list, and IRC channel. Your generous bounties are commendable, but plenty of us would help purely to help you.

Question:

I started to use ZeroMQ for IPC and made a simple echo-client/server and I'm surprised about one thing. Here is the C++ code (using zmq.hpp and zmq_addon.hpp).

Server:

zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REP);
socket.bind("ipc:///tmp/machine-1");
while (1) {
    zmq::multipart_t m;
    m.recv(socket);
    int i = m.poptyp<int>();
    i++;
    m.addtyp<int>(i);
    m.send(socket);
}

Client:

zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_REQ);

socket.connect("ipc:///tmp/machine-1");

int i = 0;
while (1) {
    int save = i;
    zmq::multipart_t m;
    m.addtyp<int>(i);
    m.send(socket);
    m.recv(socket);

    i = m.poptyp<int>();

    if (i != (save + 1))
        break;

    if ((i % 100000) == 0)
        std::cerr << "i : " << i<< "\n";
}

I works as expected. The client is sending an int, the server does plus one and sends it back.

Now the magic I don't understand: I realized, that I can run the client several times in parallel and it continues to works, for each client correctly.

The check comparing save+1 to i is always OK.

How does ZMQ handles the concurrency problem on the server side? How does it know to which client the response has to be send back?

There is this question on SO, but it doesn't answer my question: ZeroMQ REQ/REP on ipc:// and concurrency


Answer:

Per the zeromq docs, when you call REP.recv() in the server it will return a message from an enqueued REQ (client) socket. If there are multiple clients connected it will use a fair-queue policy to choose one. When you call REP.send() to reply, the REP socket always sends the response to the corresponding REQ client.

That is the "magic" - the REP socket takes care of sending the response to the correct client. If the client has disconnected it just drops the reply message.

The docs may be clearer than my explanation:

ZMQ_REP: A socket of type ZMQ_REP is used by a service to receive requests from and send replies to a client. This socket type allows only an alternating sequence of zmq_recv(request) and subsequent zmq_send(reply) calls. Each request received is fair-queued from among all clients, and each reply sent is routed to the client that issued the last request. If the original requester does not exist any more the reply is silently discarded.

Question:

You could find the program here

I am building a program in message passing framework 0MQ. I try to implement what I posted in here

Program compiled with g++ -std=c++11 test.cpp -o test -lzmq -lpthread.

To run the program, pass one parameter as the thread number you would like to have. That parameter is then assigned to variable worker_num.

In main thread, I setup thread with:

  vector<thread> pool;
  for(int i = 0; i < worker_num; i++)
  {
    cout << "main() : creating thread, " << i << endl;
    pool.push_back(thread(task1, (void *)&context, i));
  }

I would like to make sure all worker threads have successful connection to main thread before main thread distributes jobs to them.

  while(true)
  {
    if(sync_done)
    {
      cout << "sync done in main thread" << endl;
      break;
    }

    zmq::message_t sync_msg(4);
    memcpy((void *)sync_msg.data(), SYNC_MSG, SYNC_MSGLEN);
    for(int i = 0; i < worker_num; i++)
      distask_socket.send(sync_msg);

    for(int i = 0; i < worker_num; i++)
    {
      if(sync_done)
        break;
      if(i != 0)
        this_thread::sleep_for(chrono::milliseconds(500));

      zmq::message_t res_msg;
      int ret = getres_socket.recv(&res_msg, ZMQ_DONTWAIT);

      if(ret == -1 && errno == EAGAIN)
        continue;

      int threadID = stoi(string((char *)res_msg.data()));
      sync_done = if_sync_done(threadID, sync_array, worker_num);
    }
  }

So what main thread does is: pushing #worker_num of sync msgs with its PUSH endpoint to worker threads each time and then reads confirmation msg from its PULL endpoint. If main thread retrieves #worker_num of confirmation msgs, then sync done. Format of the sync msg from worker is: the worker thread's ID in a string. So thread 0 would pass a 0 in string back to main thread.

But running the program I have:

$ ./test 1
main() : creating thread, 0
thread id:0
thread 0 receives: sync
thread 0 sends: 0
thread 0 sync done
main thread receives sync msg from thread 1 # you may get many copies of this msg
terminate called after throwing an instance of 'std::invalid_argument'
  what():  stoi
Aborted

main thread receives sync msg from thread 1 means thread are 2 threads created: thread 0 and thread 1. Any idea why? I did pass 1 as parameter. Noted that if you run the program yourself you may get other outputs.

UPDATE:

Program updated: here.

Finally I figured out what's wrong.

expected output, you see thread 0 pass a 0 to main thread to notify sync done:

$ ./test 1
input parameter is: 1
main() : creating thread, 0
thread 0 receives: sync
to_string 0
thread 0 sends: 0, with size: 1
thread 0 sync done
pass 0 to if_sync_done
main thread receives sync msg from thread 0
sync done in main thread

unexpected output, you see unprintable char is passed to stoi():

$ ./test 1
input parameter is: 1
main() : creating thread, 0
thread 0 receives: sync
to_string 0
thread 0 sends: 0, with size: 1
thread 0 sync done
pass  to if_sync_done  # !!!!!
terminate called after throwing an instance of 'std::invalid_argument'
  what():  stoi
Aborted

So it seems that I use message_t incorrectly. So I need to ensure that before main thread passes the content to stoi(), the buffer still exists.

I will add an answer myself.


Answer:

zmq::message_t msg_back((void *)to_string(id).c_str(), to_string(id).size() + 1, NULL);

zmq::message_t constructor you use does not make a copy of the buffer, if [1] and [2] are to be believed. Instead, it takes ownership of the buffer.

However, you are passing a buffer managed by a temporary; that buffer is destroyed as soon as the constructor returns. You have msg_back store a dangling pointer. Any attempt to use that pointer - e.g. trying to read the message on the receiving end - exhibits undefined behavior.

Question:

The documentation of ØMQ mentions:

Individual ØMQ sockets are not thread safe except in the case where full memory barriers are issued when migrating a socket from one thread to another.

What exactly is meant by "full memory barriers?" Can I have multiple threads send over the same ØMQ socket if I synchronize this with mutexes?


Answer:

As Ulrich has said, yes you can synchronise access to a single thread using mutexes, but really, why would you want to do that?

It's normally considered good practice to only access a socket from a single thread, and synchronise between threads using messages. Something like this:

Worker thread 1
                \
Worker thread 2 - >  Control thread -> msg out
                /
Worker thread 3

where only the control thread can send messages directly over the socket. Messages from the worker threads would be sent to the control thread over an inproc zmq socket that you would create. The control thread would process just one message at a time which avoids the need for the mutexes, provided the workers have no shared state.

Message based designs are easier to implement and debug, and much easier to maintain than designs using mutexes. If you can change the design to do that, I'd advise doing so.