Hot questions for Using ZeroMQ in redis

Question:

I have found the following code that implements an asynchronous message queue (actually there is no queue, only files) with ZeroMQ and Node.js

setInterval(function() {
  var value = { id: i++, date: new Date() };
  WriteFile(value.id + ".dat", value);

  client.send(value, function(result) {
    console.log(value, result);
    DeleteFile(value.id + ".dat");
  });
}, 10000); 

The code is from here.

The functions "WriteFile" and "DeleteFile" are defined later in the code, but there is nothing extraordinary there.

The function "client.send" is also defined in another file, where the callback is defined. Clearly there is a provision from ZeroMQ to have a callback when the message transmission is successful.

Now I want to do something like this but with Redis pubsub instead of ZeroMQ for simplicity. As I understand it, there is no callback in the "publish" function from the node_redis module.

My question is, is there a way to implement something like this? I really like the idea of writing files and then deleting them whet the transmission is complete, but I would like it done in Redis. I know I am grasping at straws, but if anyone has any ideas, I will gladly listen.


Answer:

All of the redis module's commands have an optional callback as the last argument.

So doing something like

client.publish('channel', 'message', function(err) {
  if (err) throw err;
});

should work as expected.

Question:

I like to write a PHP script which will run in the background on Linux.

I am trying to figure out what are the solution to send and receive the meta data between PHP Daemon (script) and browsers clients?

I thought I could include http request handling in the daemon itself but a daemon script could receive 500-1000 requests every second. So http request in the daemon itself wouldn't be a good solution.

Is Redis or ZeroMQ are solution to this? Something like this:

(browser clients) <-----> redis <-----> PHP Daemon Script

If PHP Daemon send a message to a client then browser should get meta data immediately via ajax (long polling).


Answer:

If you want the browser to get feedback in realtime via long polling...I don't think Redis is an option. As far as I know, Redis doesn't allow for long poll queries...if the value doesn't exist at the time of query...it will return null.

My suggestion would be to use something like websockets. PHP has a few libraries that work with websockets, one I am familiar with is http://socketo.me/. However, this will mean that the client will interact directly with your PHP script. You can scale this by adding a load balancer infront of it and having multiple daemons on different ports/boxes etc.

If you don't have to use PHP, I would rather suggest something like NodeJS. It's made to do things like this and it does by default what things like Ratchet PHP tries to mimick.

Question:

I want to transfer messages, coming on server( A ZMQ_ROUTER socket, handling multiple clients) to redis server for storage purposes. I have heard that, redis doesn't speak ZMQ. So it cannot be possible without making a bridge. I am open to your suggestion. Where to look upon?

//Load Balanced Multithread-ed Server:

#include "zhelpers.hpp"
#include <queue>
#include "zmq.hpp"
#include <stdio.h>
#include <string>
#include <vector>
#include "datamsg.pb.h"
using namespace google::protobuf::io;
  bool verify(std::string str, std::vector<std::string> &s)
  {
  for(int q=0;q<s.size();q++)
  {    
   if(s.at(q)==str.substr(0,4)){ 
   s.push_back(str.substr(4,str.length()-1));

  return true;
  }
}
return false;
}
 // Basic request-reply client using REQ socket
 static void * worker_thread(void *arg) {
  zmq::context_t context(1);
  zmq::message_t worker_receive;
  datamsg worker_parsed;
  zmq::socket_t worker(context, ZMQ_REQ);
  s_set_id(worker); // Makes tracing easier
  worker.connect("ipc://backend.ipc");
   // Tell backend we're ready for work
   s_send(worker, "READY");

   while (1) {
   // Read and save all frames until we get an empty frame
   worker.recv(&worker_receive);
   worker_parsed.ParseFromArray(worker_receive.data(), worker_receive.size());
  // printing after parsing......... 
   s_sendmore (worker, worker_parsed.destination());
   s_sendmore (worker, "");  
   worker.send(worker_receive);// Here I sent the same structure back
 }
 return (NULL);
}

int main (int argc, char *argv[]) {

// Prepare our context and sockets
zmq::context_t context(1);
zmq::socket_t frontend (context, ZMQ_ROUTER);
zmq::socket_t backend (context, ZMQ_ROUTER);
zmq::socket_t verification (context, ZMQ_REP);
verification.bind("tcp://*:5557");
std::vector<std::string> s;
s.reserve(10);
s.push_back("cli4");
frontend.bind("tcp://*:5559");
backend.bind("ipc://backend.ipc");
zmq::message_t frontend_received; 
zmq::message_t front_get;
int worker_nbr;
 for(worker_nbr = 0; worker_nbr < 3; worker_nbr++) {
 pthread_t worker;
 pthread_create(&worker, NULL, worker_thread, NULL);
 }
 std::queue<std::string> worker_queue;

 while (1) {
  // Initialize poll set
   zmq::pollitem_t items[] = {
  // Always poll for worker activity on backend
  { backend, 0, ZMQ_POLLIN, 0 },
  // Poll front-end only if we have available workers
  { frontend, 0, ZMQ_POLLIN, 0 },
  //Poll for new customer for verification of client refrence ID
  {verification,0,ZMQ_POLLIN,0 }
}; zmq::poll (items, 3, -1);

if (items [0].revents & ZMQ_POLLIN) {   // Handle worker activity on backend
// Queue worker address for LoadBalanced routing
   worker_queue.push(s_recv (backend));

// Second frame is empty
 std::string empty = s_recv (backend);
 assert (empty.size() == 0);

// Third frame is READY or else a client reply address
 std::string client_addr = s_recv (backend);
// If client reply, send rest back to frontend
if(client_addr.compare("READY") != 0) {   
  std::string empty = s_recv (backend);
  assert (empty.size() == 0);
  backend.recv(&frontend_received);
  s_sendmore (frontend, client_addr);
  s_sendmore (frontend, "");
  frontend.send(frontend_received);
  //frontend.close();
  }
}
if (items [1].revents & ZMQ_POLLIN) {
// Client request is [address][request]

 std::string client_addr = s_recv (frontend);

 frontend.recv(&front_get);
 std::string worker_addr = worker_queue.front();
 worker_queue.pop();

  s_sendmore (backend, worker_addr);
  s_sendmore (backend, "");
  backend.send(front_get);

}
if (items [2].revents & ZMQ_POLLIN) {
 std::string refrence=s_recv(verification);
  if(verify(refrence,s)){
   s_send(verification,"OK");
   std::cout<<"ID:"<<refrence.substr(4,(refrence.length()-1))<<" Has been Registered"  <<std::endl;
 }
else s_send(verification,"Verification Failed!");
  }

}
sleep (1);
return 0;
}

Answer:

So, you have an application running a ZMQ ROUTER socket, and you want to archive those messages to redis? Unless you have some constraint that you haven't mentioned, you should connect to redis directly from within your application, rather than trying to pass all communication through ZMQ. ZMQ sockets only ever talk to other ZMQ sockets (without more or less reverse engineering the ZMQ protocol, but this would amount to building a bridge, which you said you don't want).

Redis has no native ZMQ connection option.

More or less what you'll need to accomplish will look like this:

-------Application-------      ------------
|                       |      | External |
|           ZMQ socket-(|<----(|  Source  |
|            v          |      ------------
|            V          |
|     (Process Data)    |      ----------
|            V          |      |  Redis |
|      Redis connector--|)---->| Server |
|                       |      ----------
-------------------------

Hopefully that makes sense. If you provide code in your original question then we can address that directly.