Hot questions for Using ZeroMQ in json

Question:

Here is my code with the extraneous stuff stripped out:

coordinator.py

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while True:
    event = poller.poll(1)
    if not event:
        continue
    process_id, val = socket.recv_json()

worker.py

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))

socket.send_json(
    (os.getpid(), True)
)

what happens when I run it:

    process_id, val = socket.recv_json()
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py", line 380, in recv_json
    return jsonapi.loads(msg)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/utils/jsonapi.py", line 71, in loads
    return jsonmod.loads(s, **kwargs)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/__init__.py", line 451, in loads
    return _default_decoder.decode(s)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 406, in decode
    obj, end = self.raw_decode(s)
  File "/Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/simplejson/decoder.py", line 426, in raw_decode
    raise JSONDecodeError("No JSON object could be decoded", s, idx)
JSONDecodeError: No JSON object could be decoded: line 1 column 0 (char 0)

and if I dig in with ipdb:

> /Users/anentropic/.virtualenvs/myproj/lib/python2.7/site-packages/zmq/sugar/socket.py(380)recv_json()
    379             msg = self.recv(flags)
--> 380             return jsonapi.loads(msg)
    381

ipdb> p msg
'\x00\x9f\xd9\x06\xa2'

hmm, that doesn't look like JSON... is this a bug in pyzmq? am I using it wrong?


Answer:

Hmm, ok, found the answer.

There is an annoying asymmetry in the ØMQ interface, so you have to be aware of the type of socket you are using.

In this case my use of ROUTER/DEALER architecture means that the JSON message sent from the DEALER socket, when I do send_json, gets wrapped in multipart message envelope. The first part is a client id (I guess this is the '\x00\x9f\xd9\x06\xa2' that I got above) and the second part is the JSON string we are interested in.

So in the last line of my coordinator.py I need to do this instead:

id_, msg = socket.recv_multipart()
process_id, val = json.loads(msg)

IMHO this is bad design on the part of ØMQ/pyzmq, the library should abstract this away and have just send and recv methods, that just work.

I got the clue from this question How can I use send_json with pyzmq PUB SUB so it looks like PUB/SUB architecture has the same issue, and no doubt others too.

This is described in the docs but it's not very clear http://zguide.zeromq.org/page:all#The-Asynchronous-Client-Server-Pattern

Update

In fact, I found in my case I could simplify the code further, by making use of the 'client id' part of the message envelope directly. So the worker just does:

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.identity = str(os.getpid())  # or I could omit this and use ØMQ client id
socket.connect('%s:%s' % (ZMQ_ADDRESS, kwargs['zmq_port']))

socket.send_json(True)

It's also worth noting that when you want to send a message the other direction, from the ROUTER, you have to send it as multipart, specifying which client it is destined for, eg:

coordinator.py

context = zmq.Context()
socket = context.socket(zmq.ROUTER)
port = socket.bind_to_random_port(ZMQ_ADDRESS)

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

pids = set()
while True:
    event = poller.poll(1)
    if not event:
        continue
    process_id, val = socket.recv_json()
    pids.add(process_id)

    # need some code in here to decide when to stop listening
    # and break the loop

for pid in pids:
    socket.send_multipart([pid, 'a string message'])
    # ^ do your own json encoding if required

I guess there is probably some ØMQ way of doing a broadcast message rather than sending to each client in a loop as I do above. I wish the docs just had a clear description of each available socket type and how to use them.

Question:

I am trying to deserialize the google protobuf messages received from ZeroMQ and trying to convert to a JSON format, using the below piece of code. But in the final output, the fields, defined as bytes, are unreadable.

( for example, "source_id": "\u0000PV\uff98t\uff9e").

Since it is a machine generated data we don't have the actual value sent from the source.

InputStream is = new ByteArrayInputStream( message.getBytes() );
Schema.nb_event data = Schema.nb_event.parseFrom( is );
String jsonFormat = JsonFormat.printToString( data );

Output

{ "seq": 6479250, "timestamp": 1488461706,"op": "OP_UPDATE","topic_seq": 595736,"source_id": "\u0000PV\uff98t\uff9e","location": {"sta_eth_mac": {"addr": "xxxxxxx"},"sta_location_x": 879.11456,"sta_location_y": 945.0676,"error_level": 1220,"associated": true,"campus_id": "\uff9f\uff94\uffc7\uffa3\uffa2\b6\uffe3\uff92U\uff9f\uffdcN\'MT","building_id": "\uffee\u0016??X}5\u001a\uffaa\uffc4^\uffa0n\uffa4\ufffb\'","floor_id": "\uffd9/\"uF\uffdd3\uffdd\uff96\u0015\uff83~\u0005\uff8a(\uffd0","hashed_sta_eth_mac": "\u0013h\u0017\uffd0\uffef\uffc8\u001f\u0005V\u0010w?xxxxxx","loc_algorithm": "ALGORITHM_LOW_DENSITY","unit": "FEET"}}

==

{ "seq":          6479250,
  "timestamp": 1488461706,
  "op":                  "OP_UPDATE",
  "topic_seq":     595736,
  "source_id":           "\u0000PV\uff98t\uff9e",
  "location":          { "sta_eth_mac":          { "addr": "\uffc0\uffcc\ufff8P\uffee." },
                         "sta_location_x":    879.11456,
                         "sta_location_y":    945.0676,
                         "error_level":      1220,
                         "associated":            true,
                         "campus_id":            "\uff9f\uff94\uffc7\uffa3\uffa2\b6\uffe3\uff92U\uff9f\uffdcN\'MT",
                         "building_id":          "\uffee\u0016??X}5\u001a\uffaa\uffc4^\uffa0n\uffa4\ufffb\'",
                         "floor_id":             "\uffd9/\"uF\uffdd3\uffdd\uff96\u0015\uff83~\u0005\uff8a(\uffd0",
                         "hashed_sta_eth_mac":   "\u0013h\u0017\uffd0\uffef\uffc8\u001f\u0005V\u0010w?\uff88\uffa8\uffee\u000fm.\u0015\uffe9",
                         "loc_algorithm":        "ALGORITHM_LOW_DENSITY",
                         "unit":                 "FEET"
                         }
  }

All the unreadable fields are defined as bytes in the .proto file.Is there any additional step required to get these values?

    optional bytes building_id        = 10;
    optional bytes floor_id           = 11;
    optional bytes hashed_sta_eth_mac = 12;

Answer:

The JSON formatter com.googlecode.protobuf.format.JsonFormat was returning as bytes but I was able to get in the required format base64 string after changing the JSON formatter to com.google.protobuf.util.JsonFormat.

Question:

I'm trying a simple code:

package main

import (
    "fmt"
    zmq "github.com/alecthomas/gozmq"
)

func main() {
    context, _ := zmq.NewContext()
    defer context.Close()

    //  Socket to receive messages on
    receiver, _ := context.NewSocket(zmq.PULL)
    defer receiver.Close()
    receiver.Connect("tcp://localhost:5557")

    //  Process tasks forever
    for {
        msgbytes, _ := receiver.Recv(0)
    fmt.Println("received")
        fmt.Println(string(msgbytes))
    }
}

In NodeJS I send messages like this:

console.log(payload);
sender.send(JSON.stringify(payload));

I can see the json in the console, so sender.sen() is actually sending things. Also, the output from the .go program for each payload is:

received
[]
received
[]

There's no output. I've searched the GoDocs for the Recv method and there's no separation like recv_json, recv_message, etc like in other languages, it's all bytes. So what's happening? I'm sending a string because it's sent as stringfy, right?

UPDATE

As Nehal said below, I changed the import statement to the official rep, and this is the new code:

package main

import (
    "fmt"
    zmq "gopkg.in/zeromq/goczmq.v4"
)

func main() {
    //  Socket to receive messages on
    receiver, _ := zmq.NewPull("tcp://*:5557")
    defer receiver.Destroy()

    //  Process tasks forever
    for {
        request, _ := receiver.RecvMessage()
    fmt.Println("received")
        fmt.Println(request)
    }
}

But this time 'received' isn't even printed, it seems that no message is being received at all


Answer:

Server in go:

import (
    "fmt"
    zmq "gopkg.in/zeromq/goczmq.v4"
)

func main() {
    //  Socket to receive messages on
    receiver, err := zmq.NewPull("tcp://*:5557")
    if err != nil {
        panic(err)
    }

    defer receiver.Destroy()

    //  Process tasks forever
    for {
        request, err := receiver.RecvMessage()
        if err != nil {
            panic(err)
        }
        fmt.Printf("Received: '%s'\n", request)
    }
}

Client in Node.js:

var zmq = require('zmq')
  , sock = zmq.socket('push');

sock.connect('tcp://127.0.0.1:5557');
setInterval(function(){
   console.log('Sending data');
   sock.send(JSON.stringify({'msg': 'Hi There!'}));
}, 500);

Server Side:

$ go run a.go
Received: '[{"msg":"Hi There!"}]'
Received: '[{"msg":"Hi There!"}]'
...

Client Side:

$ node a.js 
Sending data
Sending data
...

RecvMessage Documentation: https://godoc.org/github.com/zeromq/goczmq#Sock.RecvMessage

Node.js package: https://github.com/JustinTulloss/zeromq.node

Excellent zmq examples in go: https://github.com/booksbyus/zguide/tree/master/examples/Go

Nice getting started tutorial: http://taotetek.github.io/oldschool.systems/post/goczmq1/

Question:

I am wondering how I can send objects via a message queue. My question is two-fold:

(1) are there any message queues like ZeroMQ that support Java objects and JSON out of the box?

(2) are there any message queues that don't require you to serialize/deserialize objects on both ends?


Answer:

Well, at the very least MQTT will accept JSON payloads, because we we use that ourselves.

On your second point, its difficult to see how any inter-platform transport can avoid serialisation, as one end does not know the language or endian-ness of the other end.