Hot questions for Using ZeroMQ in reactphp

Question:

I have a small websocket server, running on top of a set of libraries:

  • ReactPHP,
  • Ratchet

and

  • ZeroMQ, using a php-zmq wrapper.

The code is basically the same as in the tutorials.

The eventloop starts correctly, users are able to connect to the server, they are getting correct messages, when the other side pushes something, but after a while, usually a few days (depending on the usage) the messages stop arriving.

The usage is not overwhelming at all - only one or two frontend developers connect at the moment, as this is a development stage.

The loop is running, it returns HTTP 101 Switching protocols on connect correctly, but does not broadcast messages that were correctly broadcast before. No errors anywhere. Restarting the event loop helps.

My questions are:

1) What can cause this? Has someone encountered similar behaviour?

2) Can you recommend a way I could debug this in long running process of the event loop?

Currently, I must stop the loop, change the code (add logging calls), restart the loop again and wait for it to go wrong again, which is tedious at least.

Any help greatly appreciated.


Answer:

Well, I guess the ZMQ was the culprit.

When there were multiple applications using ZMQ on the same machine, messages sometimes ended up in the wrong consumer - even though every application had a different port specified for connection to ZMQ sockets.

So users were sometimes getting websocket frames from a completely different application, and when there was no corresponding user for the message, the frame vanished on the way. So websockets didn't stop broadcasting, messages were just routed incorrectly.

I have no larger knowledge of ZMQ and whether this is a documented or otherwise known behaviour.

I solved the problem by rewriting the backend to RabbitMQ with a separate vhost and channel for every application. The problems are gone now, every frame ends up where it should.

Question:

I want to do an asynchronous router to dealer messaging with React but it isn't working. The code in http://zguide.zeromq.org/php:rtdealer is working, but I can't identify what I'm doing different. I'm using libzmq 4.0.5

Here is my code:

$context = new React\ZMQ\Context($loop);

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);
$worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
$worker->connect('tcp://127.0.0.1:5556');
$worker->send('END');

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$i = 0;
$loop->addPeriodicTimer(1, function (React\EventLoop\Timer\Timer $timer) use (&$i, $router) {
    echo 'Time to send!'. PHP_EOL;
    $i++;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

The problem is that only the dealer sends the first message "END". After that, router tries to send messages but dealer does not receive them.

Also, this function seems to be only called once:

// \React\ZMQ\SocketWrapper
public function handleReadEvent()
{
    $messages = $this->socket->recvmulti(ZMQ::MODE_NOBLOCK);
    echo 'Receiving...';    // Added
    var_dump($messages);    // Added

    if ($messages !== false) {
        if (count($messages) === 1) {
            $this->emit('message', array($messages[0]));
        }

        $this->emit('messages', array($messages));
    }
}

The output is:

Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Time to send!
Time to send!
Time to send!
Time to send!
Time to send!
...

Edit:

Changed the code to bind the router before the dealer connects to it, the problem is still happening:

$loop = React\EventLoop\Factory::create();

$context = new React\ZMQ\Context($loop);

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$loop->addPeriodicTimer(10, function (React\EventLoop\Timer\Timer $timer) use ($router) {
    echo 'Router sending messages with an interval of 10 seconds'. PHP_EOL;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);

$loop->addPeriodicTimer(5, function (React\EventLoop\Timer\Timer $timer) use ($worker) {
    echo 'After 5 seconds from router binding, connect the dealer and send something'. PHP_EOL;
    $worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
    $worker->connect('tcp://127.0.0.1:5556');
    $worker->send('END');
    $timer->getLoop()->cancelTimer($timer);     // Cancel the timer after connecting
});

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

This is the terminal output:

After 5 seconds from router binding, connect the dealer and send something
Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds

Answer:

I've finally solved the problem. Helped by @Jason's comment Not having used PHP/React, are you certain your namespacing is correct? , I looked at the sendmethod called. This method looks like this:

// \React\ZMQ\SocketWrapper

public function send($message)
{
    echo 'Inside send, sending message'. PHP_EOL; //This line was added by myself
    $this->buffer->send($message);
}

Then, I thought: well, if I'm calling this method this way:

$router->send('A', \ZMQ::MODE_SNDMORE);

The second argument is never passed.

Then, I saw that the sendmultimethod was not implemented in the SocketWrapperand the __callmethod was implemented in this SocketWrapper class tho call directly the method implemented in the ZMQ api for PHP, so I tried to call sendmultiwrapping this two parameters in an array. It worked, so I tried to call the first sendmethod passing the same array and I don't know why but it worked too, so the trick was calling:

$router->send(array('A', \ZMQ::MODE_SNDMORE));
$router->send(array('END'));

Instead of:

$router->send('A', \ZMQ::MODE_SNDMORE);
$router->send('END');

Also, be careful with the ZMQ PHP API located in the GitHub repository, it's wrong. PhpStorm downloaded somehow another ZMQ PHP API located somewhere that seems to be correct, the sendmultimethod prototype looks like this in this API:

public function sendmulti(array $message, $mode = 0)

Surprisingly, the following calls to this method seem to give the same results:

$router->send(array('A', \ZMQ::MODE_SNDMORE));
$router->send(array('A'), \ZMQ::MODE_SNDMORE);

Question:

I'm trying to use Ratchet for the first time and am following the push tutorial.

I have the following code in push-server.php:

namespace app\ratchet;
require_once(__DIR__ . '/../common_functions.php'); // my autoloader
require __DIR__ . '/../../vendor/autoload.php'; // composer autoloader

use app\ratchet\Pusher;

use Ratchet\Server\IoServer;
use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer;

$loop   = \React\EventLoop\Factory::create();
$pusher = new Pusher;

// Listen for the web server to make a ZeroMQ push after an ajax request
$context = new \React\ZMQ\Context($loop);
$pull = $context->getSocket(\ZMQ::SOCKET_PULL);
$pull->bind('tcp://127.0.0.1:8184'); // Binding to 127.0.0.1 means the only client that can connect is itself
$pull->on('message', array($pusher, 'onBlogEntry'));

// Set up our WebSocket server for clients wanting real-time updates
$webSock = new \React\Socket\Server($loop);
$webSock->listen(8185, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
$webServer = new \Ratchet\Server\IoServer(
    new \Ratchet\Http\HttpServer(
        new \Ratchet\WebSocket\WsServer(
            new \Ratchet\Wamp\WampServer(
                $pusher
            )
        )
    ),
    $webSock
);

$loop->run();

When I run the file I get the following warning

Warning: stream_select(): cannot represent a stream of type ZMQ_FD as a select()able descriptor in [...]\vendor\react\event-loop\StreamSelectLoop.php on line 255

Call Stack:
    0.0000     237608   1. {main}() [...]\app\ratchet\push-server.php:0
    0.0130    1400696   2. React\EventLoop\StreamSelectLoop->run() [...]\app\ratchet\push-server.php:36
    0.0130    1401688   3. React\EventLoop\StreamSelectLoop->waitForStreamActivity() [...]\vendor\react\event-loop\StreamSelectLoop.php:201
    0.0130    1402240   4. React\EventLoop\StreamSelectLoop->streamSelect() [...]\vendor\react\event-loop\StreamSelectLoop.php:221
    0.0130    1402384   5. stream_select() [...]\vendor\react\event-loop\StreamSelectLoop.php:255

The referenced line in \vendor\react\event-loop\StreamSelectLook.php is:

return stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);

I have never used Ratchet or ZeroMQ before and am struggling to understand what the problem is. I can't find anything helpful on Google when searching for this.

I'm running WampServer x 64 on Windows 7 x 64. I followed this guide on how to install ZMQ.

Here are some debugging screenshots:

What's causing this problem and how do I fix it?


Answer:

After about five hours of trying to figure out the solution to this problem, I tried installing the 32-bit version of WampServer based on a comment I read somewhere and the problem is fixed. So it appears the stream_select() issue is caused by a problem with WampServer.

I'll leave this question here because I'm sure someone will run into the same problem at some point and hopefully this will save them a lot of time.

Question:

I'm using reactphp/zmq.

How can I have multiple push workers within multiple pull workers, is that possible?

A only can have multi pull and single push, as in README's example:

$push->connect()
$pull->bind()

Or single pull and multi pushes:

$push->bind()
$pull->connect()

When I try to set both as connect the pulls doesn't receive the messages.

Otherwise, trying to start more than one process with bind it throws:

ZMQSocketException: Failed to bind the ZMQ: Address in use

Should I have a middleware? 🤔

(5555)                      (5556)
push -|  (5555) > (5556)   |- pull
push -|-> pull  &  push <- |- pull
push -|   bind  /  bind    |- pull
connect                    connect

Answer:

Aha! Thank you very much @Mjh for clarifying things.

Turns out that I do need something in the middle that should be a message queue broker and ZMQ does have something for that. Internally calling zmq_proxy starts a rrbroker loop, unfortunately theses guys aren't abstracted at reactphp/zmq, but they are at ext-php through ZMQDevice.

Source: http://zguide.zeromq.org/php:chapter2#ZeroMQ-s-Built-In-Proxy-Function