Hot questions for Using ZeroMQ in pgm protocol

Top 10 C/C++ Open Source / ZeroMQ / pgm protocol

Question:

I try to use the weather example from the ZeroMQ guide, with an epgm:// transport-class. At first my code:

// Publisher

#include "zhelpers.h"

int main (void) {
    //  Prepare our context and publisher
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket ( context, ZMQ_PUB );
    // int rc = zmq_bind ( publisher, "tcp://*:5556" );
       int rc = zmq_bind ( publisher, "epgm://192.168.1.4;224.0.0.251:5556" );
    assert ( rc == 0 );

    //  Initialize random number generator
    srandom ( (unsigned) time (NULL) );
    while (1) {
        //  Get values that will fool the boss
        int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;

        //  Send a message to all subscribers
        char update [20];
        sprintf ( update, "%05d %d %d", zipcode, temperature, relhumidity );
        s_send ( publisher, update );
    }
    zmq_close ( publisher );
    zmq_ctx_destroy ( context );
    return 0;
}

    // Subscriber
    #include "zhelpers.h"

    int main ( int argc, char *argv [] )
    {
        //  Socket to talk to server
        printf ( "Collecting updates from weather server…\n" );
        void *context = zmq_ctx_new ();
        void *subscriber = zmq_socket ( context, ZMQ_SUB );
        // int rc = zmq_connect ( subscriber, "tcp://192.168.1.4:5556" );
           int rc = zmq_connect ( subscriber, "epgm://192.168.1.9;224.0.0.251:5556" );
        assert ( rc == 0 );

        //  Subscribe to zipcode, default is NYC, 10001
        char *filter = ( argc > 1 )? argv [1]: "10001 ";
        rc = zmq_setsockopt ( subscriber, ZMQ_SUBSCRIBE,
                              filter, strlen (filter) );
        assert ( rc == 0 );

        //  Process 100 updates
        int update_nbr;
        long total_temp = 0;
        for ( update_nbr = 0; update_nbr < 100; update_nbr++ ) {
            char *string = s_recv ( subscriber );

            int zipcode, temperature, relhumidity;
            sscanf ( string, "%d %d %d",
                    &zipcode, &temperature, &relhumidity );
            total_temp += temperature;
            free ( string );
        }
        printf ( "Average temperature for zipcode '%s' was %dF\n",
            filter, (int) ( total_temp / update_nbr ) );

        zmq_close ( subscriber );
        zmq_ctx_destroy ( context );
        return 0;
    }
  1. I've tried this code with tcp:// on the same hosts ( see comments ) and it works fine.
  2. I tried the multicast address with this example Openbook Linux ( with my multicast address ), and I got the messages.
  3. I installed ZeroMQ with pgm:// ( before I was getting assertion faults ).
  4. My hosts both are Ubuntu 16.04 systems
  5. I'm using ZeroMQ 4

My problem is that I don't get any messages with epgm://. Both programs were running, but no message were received. What am I missing? I don't understand what could be wrong.


Answer:

For later visitors of the question, the code is fine and running. To see if it works on your system, change the for-loop to one loop and set the zipcode in the publisher on 10001, if you start the subscriber without an argument.

Question:

I have compiled libzmq with openpgm with no changes under windows. Code here is taken from ZeroMQ Guide ("weather publisher" server/client). But if i change "tcp" to "epgm" it doesn't work any more (data is not received, but connection is established).

void test_serv()
{    
    //  Prepare our context and publisher
    void *context = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);
    int rc = zmq_bind(publisher, "epgm://127.0.0.1:5556");
    assert(rc == 0);

    //  Initialize random number generator
    srandom((unsigned)time(NULL));
    while (!stop_server)
    {
        //  Get values that will fool the boss
        int zipcode, temperature, relhumidity;
        zipcode = randof(1000) + 600;
        temperature = randof(215) - 80;
        relhumidity = randof(50) + 10;

        //  Send message to all subscribers
        char update[20];
        sprintf(update, "%d %d %d", zipcode, temperature, relhumidity);
        s_send(publisher, update);
    }
    LOG("END Server shutdown");
    Sleep(500);
    zmq_close(publisher);
    zmq_ctx_destroy(context);
}

void test_sock()
{    
    //  Socket to talk to server
    LOG("Collecting updates from weather server...");
    void *context = zmq_ctx_new();
    void *subscriber = zmq_socket(context, ZMQ_SUB);
    int rc = zmq_connect(subscriber, "epgm://127.0.0.1:5556");
    assert(rc == 0);

    //  Subscribe to zipcode, default is NYC, 10001
    char *filter = "1001 ";
    rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE,
        filter, strlen(filter));
    assert(rc == 0);

    //  Process 100 updates
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 10; update_nbr++) {
        char *string = s_recv(subscriber);

        int zipcode, temperature, relhumidity;
        sscanf(string, "%d %d %d",
            &zipcode, &temperature, &relhumidity);
        total_temp += temperature;
        LOG(">> " << string);
        free(string);
    }
    LOG("Average temperature for zipcode "<< filter << "was " << (int)(total_temp / update_nbr) << 'F');

    zmq_close(subscriber);
    zmq_ctx_destroy(context);
}

I run two functions in different threads, with tcp anything works as expected.

I have tried doing "route print 0.0.0.0" with cmd.exe and using interface IP (192.168.137.64) as prefix instead of "eth0" like shown in RFC: epgm://192.168.137.64;127.0.0.1:5556 on connect and/or bind, but this brokes my socket and raises error.

Also "PGM" requires administrator rights and i cannot test it now.

The error IS NOT "protocol not supported" errno is set to B (11) and i don't understand what does it mean (no docs on it).


Answer:

EPGM is a bit finicky. According to this list post, if you're using EPGM your publisher and subscriber must be on separate hosts. More details here, it looks like this was a deliberate choice by the ZMQ team.

So, try it by spinning up your PUB and SUB on separate machines (changing the network addresses accordingly, of course).

Question:

I'm looking into ZeroMQ for its PGM support. Running on Windows (in a VirtualBox with MacOS as host, if that could matter), using the NetMQ library.

The test I want to do is very simple: send messages from A to B as fast as possible...

First I used TCP as transport; this got easily to >150 000 messages per second, with two receivers keeping pace. Then I wanted to test PGM; all I did was to replace the address "tcp://*:5556" with "pgm://239.0.0.1:5557" on both sides.

Now, the PGM tests give very strange results: the sender easily gets to >200 000 messages/s; the receiver though, manages to process only about 500 messages/s !?

So, I don't understand what is happening. After slowing down the sender (sleep 10ms after each message, since otherwise it's practically impossible to investigate the flow) it appears to me that the receiver is trying to keep up, initially sees every message passing by, then chokes, misses a range of messages, then tries to keep up again... I played with the HWM and Recovery Interval settings, but that didn't seem to make much difference (?!).

Can anyone explain what's going on?

Many thanks, Frederik

Note: Not sure if it's matters: as far as I understand, I don't use OpenPGM - I just download the ZeroMQ setup, and enabled 'Multicasting Support' in Windows.

This is the Sender code:

class MassSender
{
    private const string TOPIC_PREFIX = "Hello:";

    private static int messageCounter = 0;
    private static int timerCounter = 0;

    public static void Main(string[] args)
    {
        Timer timer = new Timer(1000);
        timer.Elapsed += timer_Elapsed;

        SendMessages_0MQ_NetMQ(timer);
    }

    private static void SendMessages_0MQ_NetMQ(Timer timer)
    {
        using (NetMQContext context = NetMQContext.Create())
        {
            using (NetMQSocket publisher = context.CreateSocket(ZmqSocketType.Pub))
            {
                //publisher.Bind("tcp://*:5556");
                publisher.Bind("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.

                timer.Start();
                while (true)
                {
                    string message = GetMessage();

                    byte[] body = Encoding.UTF8.GetBytes(message);
                    publisher.Send(body);
                }
            }
        }
    }

    private static string GetMessage()
    {
        return TOPIC_PREFIX + "Message " + (++messageCounter).ToString();
    }
    static void timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        Console.WriteLine("=== SENT {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s ===", messageCounter, messageCounter / ++timerCounter);
    }
}

and the Receiver:

class MassReceiver
{
    private const string TOPIC_PREFIX = "Hello:";

    private static int messageCounter = 0;
    private static int timerCounter = 0;
    private static string lastMessage = String.Empty;

    static void Main(string[] args)
    {
        // Assume that sender and receiver are started simultaneously.
        Timer timer = new Timer(1000);
        timer.Elapsed += timer_Elapsed;

        ReceiveMessages_0MQ_NetMQ(timer);
    }

    private static void ReceiveMessages_0MQ_NetMQ(Timer timer)
    {
        using (NetMQContext context = NetMQContext.Create())
        {
            using (NetMQSocket subscriber = context.CreateSocket(ZmqSocketType.Sub))
            {
                subscriber.Subscribe(""); // Subscribe to everything

                //subscriber.Connect("tcp://localhost:5556");
                subscriber.Connect("pgm://239.0.0.1:5557"); // IP of interface is not specified so use default interface.

                timer.Start();
                while (true)
                {
                    messageCounter++;

                    byte[] body = subscriber.Receive();

                    string message = Encoding.UTF8.GetString(body);                        
                    lastMessage = message; // Only show message when timer elapses, otherwise throughput drops dramatically.  
                }
            }
        }
    }

    static void timer_Elapsed(object sender, ElapsedEventArgs e)
    {
        Console.WriteLine("=== RECEIVED {0} MESSAGES SO FAR - TOTAL AVERAGE IS {1}/s === (Last: {2})", messageCounter, messageCounter / ++timerCounter, lastMessage);
    }
}

Answer:

What is the size of each message?

You are not using OpenPGM, you are using what is called ms-pgm (Microsoft implementation of PGM).

Anyway you might have to change the MulticastRate of the socket (it defaults to 100kbit/s).

Also what kind of network are you using?