Hot questions for Using ZeroMQ in multi agent

Question:

I'm using ZeroMQ.

I want a node B to subscribe to a node A. Node A will send ( PUB? ) values of some 'volatile variable' (say, the position of some object). 'Volatile' in this case means that node B only cares about the most recent value.

The upshot should be that A can send values to subscribers, but if two values of the variable ever get queued up in the outgoing (or incoming) queues, then the most recent value would replace the earlier values. And another upshot would be: there's no high-water mark.

I can achieve this with PUB/SUB, obviously, but that wouldn't get things like most-recent-value-always-wins. It seems like there'd be some established pattern to achieve this, but I haven't found it.

( I suppose this means I want a ZeroMQ socket pattern to work as if it's a pure udp )


Answer:

Q : How to make a 'shared volatile variable' ( s.t. most-recent-value-always-wins ) with ZeroMQ?

In case your application-level logic is happy with the PUB/SUB - one PUB-lishes, by .send()-ing messages, others SUB-scribe to theirs respective topic-of-choice, so as to start .recv()-ing Scalable Formal Communications Pattern archetype, we may fine-tune the configuration so as it meets all your requirements, expressed above ( s.t. most-recent-value-always-wins )

In case one has never worked with ZeroMQ, one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds"before diving into further details


The proper configuration step :

The trick is to use .setsockopt( ZMQ_CONFLATE, 1 ) method to just "switch-ON" this very kind of behaviour, managed by the Context()-engine instance(s) silently to the user, right "inside" the Queue-managers' policy.

If set, a socket shall keep only one message in its inbound/outbound queue, this message being the last message received/the last message to be sent. Ignores ZMQ_RCVHWM and ZMQ_SNDHWM options. Does not support multi-part messages, in particular, only one part of it is kept in the socket internal queue.

Applicable socket types ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER


It was that easy !

All the best with mastering the art of Zen-of-Zero.

Question:

I have seen it mentioned that ZMQ supports a many-to-many PUB/SUB relation.

In this case what I would like is to have multiple subscribers to multiple publishers, (this is for a common bus style application), however what I am confused about is how to physically implement it since I have also seen it mentioned that there can only be a single bind and multiple connections to that bound socket.

Thus I am slightly confused as to how to achieve this.

I have seen that pgm may be a way to acheive this (since all members would connect to the same multicast address), but I am not sure how to physically do that...


Answer:

Q : how to physically implement it

In case one has never worked with ZeroMQ, one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds"before diving into further details

A PUB_A on a Computer A PUB_A.bind()-s, any SUB may .connect() there, onto A

A PUB_B on a Computer B PUB_B.bind()-s, any SUB may .connect() there, onto B

A rev_PUB_C on any host rev_PUB_C.connect()-s to a few or many SUB-s, who 've had a previously successful SUB_xyz.bind() onto their respective local address(es)

And the Merry Go Round goes on, as the distributed-system designer likes it to let the show evolve.

Cute, isn't it?

Welcome to the Zen-of-Zero

Question:

I want to check the existence ( state ) of a server before I send a ZeroMQ request, but I have no idea how to do it.


Answer:

Q : I want to check the existence ( state ) of a server before I send a ZeroMQ request

The solution is to setup and use the services of a zmq_socket_monitor()

// Read one event off the monitor socket; return value and address
// by reference, if not null, and event number by value. Returns -1
// in case of error.

static int
get_monitor_event ( void  *monitor,
                    int   *value,
                    char **address
                    )
{   
    zmq_msg_t msg;
    zmq_msg_init ( &msg );                                       // First frame in message contains event number and value
    if ( zmq_msg_recv ( &msg, monitor, 0 ) == -1 ) return -1;    // Interrupted, presumably
    assert ( zmq_msg_more ( &msg )              & "REASON: Frame #1 FAILED TO SIG 2nd, EXPECTED, FRAME TO COME" );

    uint8_t  *data  =  ( uint8_t  * ) zmq_msg_data ( &msg );
    uint16_t  event = *( uint16_t * ) ( data );

    if ( value )
        *value = *( uint32_t * ) ( data + 2 );


    zmq_msg_init ( &msg );                                      // Second frame in message contains event address
    if ( zmq_msg_recv ( &msg, monitor, 0 ) == -1 ) return -1;   // Interrupted, presumably
    assert ( !zmq_msg_more ( &msg )             & "REASON: Frame #2 FAILED TO SIG more, NOT EXPECTED, FRAMEs TO COME" );

    if ( address ) {
        uint8_t *data = ( uint8_t * ) zmq_msg_data ( &msg );
        size_t   size =               zmq_msg_size ( &msg );
        *address = ( char * ) malloc ( size + 1 );
        memcpy ( *address, data, size );
        ( *address )[size] = 0;
    }
    return event;
}

int main ( void )
{   
    void    *ctx = zmq_ctx_new ();
    assert ( ctx                                & "REASON: Context FAILED to instantiate" );

    void    *client = zmq_socket ( ctx, ZMQ_DEALER );
    assert ( client                             & "REASON: Socket FAILED to instantiate" );

 // Socket monitoring only works over inproc://
    int      rc = zmq_socket_monitor ( client, "inproc://monitor-client-side", ZMQ_EVENT_ALL );
    assert ( rc == 0                            & "REASON: socket_monitor FAILED to instantiate over INPROC:// transport-class" );

 // Create socket for collecting monitor events
    void    *client_side_mon = zmq_socket ( ctx, ZMQ_PAIR );
    assert ( client_side_mon                    & "REASON: socket_monitor receiving Socket FAILED to instantiate " );

 // Connect these to the inproc endpoints so they'll get events
             rc = zmq_connect ( client_side_mon, "inproc://monitor-client-side" );
    assert ( rc == 0                            & "REASON: .connect()-method FAILED to get connected" );

 // Now do whatever you need
    ...

 // Close client
    close_zero_linger ( client );

 // --------------------------------------------------------------------
 // How to collect and check events from socket_monitor:
    int  event =  get_monitor_event ( client_side_mon, NULL, NULL );

    if ( event == ZMQ_EVENT_CONNECT_DELAYED )
         event =  get_monitor_event ( client_side_mon, NULL, NULL );

    assert ( event == ZMQ_EVENT_CONNECTED       & "REASON: [client]-socket still not in an expected, .connect()-ed, state" );
    ...

    ...
    event = get_monitor_event ( client_side_mon, NULL, NULL );
    assert ( event == ZMQ_EVENT_MONITOR_STOPPED & "REASON: [client]-socket not in an expected, .close()-ed, state" );

 // --------------------------------------------------------------------
 // FINALLY:
 // --------------------------------------------------------------------
 // Close down the sockets
    close_zero_linger ( client_side_mon );

    zmq_ctx_term ( ctx );

    return 0;
    }

( included in API since v3.2+ )