Hot questions for Using ZeroMQ in qt

Question:

When I open my application then the application is waiting for a connection to the server, I have done that by calling a slot run() which waits for a acknowledgement packet from server and when it receives it then it hides "Waiting for connection" string and loads other things. The problem is that when it waits for a packet then the system tray icon is not responding to anything, when the server sends packet and application loads then the system tray icon starts responding (for right-click menu).

I am using ZeroMQ for IPC.

I have something like this:

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    MainWindow w;
    w.show(); 

    //THIS PART
    QTimer::singleShot(2000,&w,SLOT(run()));

    return a.exec();
}

Answer:

You block the event loop. This won't ever work. Unfortunately, ZMQ doesn't offer any platform-specific message loop integration. Thus you have to use it in a separate thread.

This is much easier since it's bad design anyway to put networking code in a widget class.

Create a ZMQ object that encapsulates your networking, and push it to a separate thread. As long as all your communication with that ZMQ instance is over signals/slots or QMetaObject::invokeMethod, you'll be OK.

See this answer for code to Thread.

class ZMQ : public QObject {
  Q_OBJECT
  Q_SLOT void run() {
    ...
    forever {
      socket.send(request,0);
      socket.recv(&response);
      if(response.compare("login") == 0) {
        emit loggedIn();
        socket.close();
        return;
      }
    }
  }
public:
  ZMQ() {}
  Q_SIGNAL void loggedIn();
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    ZMQ zmq;
    Thread thread; // see https://stackoverflow.com/a/25230470/1329652
    MainWindow w;

    w.connect(&zmq, SIGNAL(loggedIn()), SLOT(loggedIn()));
    zmq.moveToThread(&thread);
    thread.start();

    QMetaObject::invokeMethod(&zmq, "run");

    w.show(); 
    return a.exec();
}

Question:

I'm trying to implement both, zmq and a Pyforms GUI which both requires there own event loop. The task is to have a Pyforms GUI with a textfield, that displays the incoming zmq messages. This is the simplified code that I'm trying to get to work.

import pyforms
from   pyforms          import BaseWidget
from   pyforms.controls import ControlTextArea
from   pyforms.controls import ControlButton
import threading
import zmq
from zmq.asyncio import Context
from zmq.eventloop.zmqstream import ZMQStream
from zmq.eventloop import ioloop


class SimpleExample1(BaseWidget):

    def __init__(self):
        super(SimpleExample1,self).__init__('Simple example 1')

        #Definition of the forms fields
        self._controltextarea     = ControlTextArea('textarea to show incoming zmq messages')
        self._button        = ControlButton('Press this button')

        def echo(msg):
            self._controltextarea.__add__(msg) #this should add a line in the Textbox with the message "msg"


        context = Context.instance()
        s = context.socket(zmq.PULL)
        s.connect('tcp://127.0.0.1:5014')
        stream = ZMQStream(s)
        stream.on_recv(echo)  #this calls the function echo from the zmq Ioloop when something is recived

#Execute the application
if __name__ == "__main__":
    #here is where I have tried a lot to make both loops work simultaniously, without success
    guiThread = threading.Thread(target=pyforms.start_app( SimpleExample1 ))
    zmqThread = threading.Thread(target=lambda: ioloop.IOLoop.current().start())
    zmqThread.setDaemon(True)
    guiThread.start()
    zmqThread.start()

This is the ZMQ sender.

import zmq
import time

context = zmq.Context()
publisher = context.socket(zmq.PUSH)
publisher.bind('tcp://127.0.0.1:5014')

while True:
    publisher.send_string('something')
    #print('sended')
    time.sleep(1)

I see 2 possible solutions. First it could work with threads like in the code above. But I haven't found a way to start both event loops. Either one statement blocks the other or I get error messages when I don't use lamda etc. Or it's just not working. - here is a reference I tried to implement for this without success, describing a similar task: github maartenbreddels

The second option is to add the zmq function call of echo() into the eventloop of Pyforms (which is based on QT as far as I know). This could be the most elegant but I don't know how to implement or add something to the GUI's event loop.

I've tried a lot for both solutions without success. The most valuable information I could find is here:

pyzmq readthedocs

zeromq org

pyforms readthedocs

I've not a lot experience and try to understand things like futures, promises and coroutines but also frameworks like asyncio, green in python without success so far. A simple function call for "echo", as soon as a message is received, is what I'm looking for.

Any Ideas how to make it work? Am I doing something silly ?


Answer:

Apologies in advance for a vague answer, but perhaps it can act as a potential starting point.

PyForms looks like, ultimately, it's based on Qt. Qt I think can use a socket (well, a file descriptor) as an input even source. ZeroMQ, at least the C version, exposes a file descriptor that becomes to ready-to-read when a ZMQ message has been received. So in principal, Qt could use this file descriptor to call a callback that reads whatever ZMQ socket has received a message, and handle the message on the Qt event loop's thread (which may have other benefits!).

Whether or not any of this is exposed by PyZMQ and PyForms I'm afraid I don't know.

Question:

We need a lightweight client based messaging solution. We used AMQP, RabbitMQ before, but in C++ we have problems.

We would like to choose ZeroMQ with malamuteserver or MQTT ? Our IoT will publish data ( 45 kb ) almost every 5 min.

We need to deliver this message 100% and do not want to loose any message.

We tried MQTT QoS level 2, but when server disconnected or main server client has a problem we are loosing published messages.

We need exactly RabbitMQ task / worker model. Messages should queued in the server until consumers connected if anything happens.

Any suggestion, direction and examples welcome.

P.S.: This will be production so we want to chose less problematic way :)

Thanks a lot.


Answer:

I think MQTT is overhyped , sure the result I think is due to available open source servers. Zeromq does offer a lot of features to build something that will meet the requirements. The more I look at the options available , the more i am leaning towards zeromq. In our case, we will be receiving data at random intervals from a very large number of devices ( gateway's in a mesh network or End nodes themsevles ). Our finalized message size is very simple , a delimited string , binary encoded , zipped ( <100 Bytes) and sent over the wire. I am decided on zeromq on the server to receive messages. Reasons are not merely based on zeromq as a broker but also how We can utilize its curveZmq features to make provisioning of devices easy and allowing for a simple ZTP (Zero Touch Provisioning system) and key manageability . I am at the preset time debating on using pub/sub vs push /pull patterns where in each end device is a publisher or pusher with a proxy subscriber on the cloud server. while typically pub in a pub/sub there are fewer publishers and more subscribers in a typical large scale IOT deployment there are more publishers and fewer subscribers so it makes me wonder if i should go with pub/sub and there there is the problem of loosing messages due to slow joining subscriber - what if we bring down the server for maintenance , devices in the field would keep publishing messages until HWM is reached. Guess there is always a risk of loosing messages irrespective of anything ex - backhaul network is down and the device Hits the HWM - this is out of control.

Malamute does not have much documentation else i would have explored it a bit more.

So , have you decided on what to use ? If you want to persist messages until they are consumed , i strongly suggest zeromq as a proxy with workers pushing messages into a persistent store .. you can get creative here as well with including a sequence # etc..and allowing clients to request messages given a sequence range etc if they are lost .

Question:

I am using Ubuntu 18.04, Qt 5.12, and libzmq.so.5.1.5. I have 2 very simple Python 3.6 scripts, using pyzmq 18.0.1, one implements ZMQ PUB and one implements ZMQ SUB. These Python scripts work find and data is transmitted from one to the other on the localhost loopback network. I am trying to implement the subscriber in Qt 5.12, and the subscriber always blocks on receive when I send the same data using the same Python publisher script. I've pasted the complete code below - how I can I get ZMQ with Qt 5.12 to actually received data? Thanks.

The output of my C++ app looks successful:

"Server IP Address determined to be: 127.0.0.1"
"Connecting to: tcp://127.0.0.1:5555"
"Attempted Connect: 0"
"SetSockOpt: 0"
"GetSockOpt: 0"
Done setting up socket
Waiting

Python 3 PUB:

#!/usr/bin/python3

import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5555")

# Allow clients to connect before sending data
time.sleep(3)
while True:
    socket.send_pyobj({1:[1,2,3]})
    time.sleep(1)

Python 3 SUB:

#!/usr/bin/python3

import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
# We can connect to several endpoints if we desire, and receive from all.
socket.connect("tcp://127.0.0.1:5555")

# We must declare the socket as of type SUBSCRIBER, and pass a prefix filter.
# Here, the filter is the empty string, wich means we receive all messages.
# We may subscribe to several filters, thus receiving from all.
socket.setsockopt(zmq.SUBSCRIBE, b'')

while True:
    message = socket.recv_pyobj()
    print(message.get(1)[2])

Qt 5.12 Subscriber Code:

#include <QCoreApplication>

// Std includes
#include <stdio.h>
#include <unistd.h>
#include <assert.h>

// Qt
#include <QDebug>
#include <QFile>

// ZeroMQ Includes
#include <zmq.h>

int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);

    //QString m_ServerIP = QString("*");
    QString m_ServerIP = QString("127.0.0.1");
    QString m_ServerPort = QString("5555");

    qDebug() << QString("Server IP Address determined to be: %1").arg(m_ServerIP);

    void* m_Context = zmq_ctx_new();
    assert (m_Context);
    void* m_Subscriber = zmq_socket (m_Context, ZMQ_SUB);
    assert (m_Subscriber);
    int rc = -1;
    unsigned int fd = 0;
    do {

        const char *filter = std::string("").c_str();
        QString ipAndPort = QString("tcp://%1:%2").arg(m_ServerIP).arg(m_ServerPort);

        qDebug() << QString("Connecting to: %1").arg(ipAndPort);

        rc = zmq_connect(m_Subscriber, ipAndPort.toStdString().c_str());

        qDebug() << QString("Attempted Connect: %1").arg(rc);

        rc = zmq_setsockopt(m_Subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));

        qDebug() << QString("SetSockOpt: %1").arg(rc);

        size_t fd_size = sizeof(fd);

        rc = zmq_getsockopt(m_Subscriber,ZMQ_FD,&fd,&fd_size);

        qDebug() << QString("GetSockOpt: %1").arg(rc);

    }
    while ( rc < 0 );

    qDebug() << "Done setting up socket";

    while ( true) {

        zmq_msg_t message;
        zmq_msg_init(&message);
        qDebug() << "Waiting";
        zmq_recvmsg(m_Subscriber, &message, 0);
        size_t size = zmq_msg_size (&message);

        qDebug() << QString("Message Size: %1").arg(size);

        char *string = static_cast<char*>(malloc(size + 1));
        memcpy (string, zmq_msg_data(&message), size);
        zmq_msg_close (&message);
        string [size] = 0;
        if (string) {
            QByteArray frame = QByteArray::fromBase64(QByteArray(string));
            free(string);

            qDebug() << QString("Debug RX Frame Size: %1").arg(frame.size());

            QFile output("/tmp/abcd.jpeg");
            if ( output.open(QIODevice::WriteOnly) ) {
                output.write(frame);
                output.close();
            }

        }

    }

    return a.exec();
}

Answer:

You have the following errors:

  • You do not need QCoreApplication since you are not using signals, slots, events, etc. that need an event-loop.

  • You are using the C api of zmq, there is a compatible api for C++, and to use it use the header <zmq.hpp>.

  • When you use send_pyobj() python it uses pickle to transmit the data, but in C++ there is no method to revert the data. Instead you should send the data with send_json() from python.

Considering the above, the solution is:

*.py

#!/usr/bin/python3

import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5555")

# Allow clients to connect before sending data
time.sleep(3)
while True:
    socket.send_json({1:[1,2,3]})
    time.sleep(1)

main.cpp

#include <QString>
#include <QDebug>
#include <QUrl>
#include <QThread>

#include <zmq.hpp>

int main(void) {

    QString m_ServerIP = QString("127.0.0.1");
    int m_ServerPort = 5555;

    qDebug() << QString("Server IP Address determined to be: %1").arg(m_ServerIP);

    QUrl url;
    url.setScheme("tcp");
    url.setHost(m_ServerIP);
    url.setPort(m_ServerPort);

    zmq::context_t context(1);
    zmq::socket_t subscriber (context, ZMQ_SUB);
    subscriber.connect(url.toString().toStdString());
    subscriber.setsockopt( ZMQ_SUBSCRIBE, "", 0);

    while(true) {
        zmq_msg_t in_msg;
        zmq_msg_init(&in_msg);
        zmq::message_t  message;
        subscriber.recv(&message);
        qDebug() << QString("Message Size: %1").arg(message.size());
        QByteArray ba(static_cast<char*>(message.data()), message.size());
        qDebug()<< "message" << ba;
        QThread::sleep(1);
    }
}

Output:

"Server IP Address determined to be: 127.0.0.1"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"

You can decode the data using QJsonDocument. I also recommend you to review the following examples in C++ of zmq.


If you use socket.send_pyobj({1:[1,2,3]}) you get the following that is the result of pickle:

"Server IP Address determined to be: 127.0.0.1"
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."

Question:

On an Ubuntu 16.04 machine, I am using Python3 with pyzmq to send a base64 ended JPEG through a ZeroMQ PUB/SUB socket to a Qt5 Application. The Qt5 application uses QSocketNotifier to monitor the ZMQ Socket and is supposed to trigger whenever the socket becomes "activated" with more data to read. In my example, the data is completely received on the Qt5 side and the JPEG looks okay - however, the socket notifier only fires once, even though the Python Publisher keeps sending data. If I exit the Python application with ctrl-C, one more instance of the socket notifier activated slot is fired on the Qt5 App side. I want the slot to be fired each time a ZMQ message is sent from Python. I've attached the Python and Qt5 entire apps below. How can I make this work? Thanks.

-------- Header File ------

#ifndef SOCKETRECEIVER_H
#define SOCKETRECEIVER_H

#include <QObject>

// Debug
#include <QDebug>
#include <QDataStream>
#include <QFile>
#include <unistd.h>

// User ZeroMQ Sockets
#include <QSocketNotifier>

// ZeroMQ Includes
#include <zmq.h>

// Debug prints
static void AppDebug(QString message) {
    qDebug() << Q_FUNC_INFO << message;
}

class SocketReceiver : public QObject
{

    Q_OBJECT

public:

    explicit SocketReceiver(QObject *parent = nullptr);
    ~SocketReceiver();

private slots:

    void readZMQData();

private:

    QSocketNotifier *m_SocketNotifier;
    void *m_Context;
    void *m_Subscriber;
    long long int m_RxFrameCounter;
    bool m_DidRXFrame;

signals:

public slots:
};

#endif // SOCKETRECEIVER_H

-------- CCP File ------

#include "socketreceiver.h"

SocketReceiver::SocketReceiver(QObject *parent) : QObject(parent) ,  m_SocketNotifier(nullptr) , m_Context(nullptr) , m_Subscriber(nullptr) , m_RxFrameCounter(0) , m_DidRXFrame(false)
{

    /***** ZMQ *****/

    int major, minor, patch;
    zmq_version (&major, &minor, &patch);

    m_Context = zmq_ctx_new();
    m_Subscriber = zmq_socket (m_Context, ZMQ_SUB);
    int rc = -1;
    unsigned int fd = 0;
    do {

    const char *filter = std::string("").c_str();
    rc = zmq_connect (m_Subscriber, "tcp://localhost:5556");
    rc = zmq_setsockopt (m_Subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));
    size_t fd_size = sizeof(fd);
    rc = zmq_getsockopt(m_Subscriber,ZMQ_FD,&fd,&fd_size);

    m_SocketNotifier = new QSocketNotifier(fd, QSocketNotifier::Read, this);
    connect(m_SocketNotifier, SIGNAL(activated(int)), this, SLOT(readZMQData()), Qt::DirectConnection);

    }
    while ( rc < 0 );

    AppDebug("Done setting up");

}

SocketReceiver::~SocketReceiver()
{

    zmq_close (this->m_Subscriber);
    zmq_ctx_destroy (this->m_Context);

}

void SocketReceiver::readZMQData()
{

    m_SocketNotifier->setEnabled(false);

    AppDebug("Waiting for next frame...");

    QByteArray newFrame;

    int events = 0;
    std::size_t eventsSize = sizeof(events);
    zmq_getsockopt(m_Subscriber,ZMQ_EVENTS, &events, &eventsSize);
    if(events & ZMQ_POLLIN){

    AppDebug("Read Data...");

    // Receive data from socket
    zmq_msg_t message;
    zmq_msg_init(&message);
    zmq_recvmsg(m_Subscriber, &message, 0);
    size_t size = zmq_msg_size (&message);
    AppDebug(QString("Message Size: %1").arg(size));
    char *string = static_cast<char*>(malloc(size + 1));
    memcpy (string, zmq_msg_data(&message), size);
    zmq_msg_close (&message);
    string [size] = 0;

    if ( string != nullptr ) {

        QByteArray newDecodedData = QByteArray::fromBase64(QByteArray(string));
        newFrame.append(newDecodedData);
        free(string);

        if ( !m_DidRXFrame ) {
            m_DidRXFrame = true;
        }

        if ( m_RxFrameCounter == 0 && m_DidRXFrame ) {

            AppDebug(QString("Debug RX Frame Size: %1").arg(newFrame.size()));
            QFile output("/tmp/abcd.jpeg");
            if ( output.open(QIODevice::WriteOnly) ) {
                output.write(newFrame);
                output.close();
                //sleep(86400);
            }

        }

        m_RxFrameCounter++;

    }

    }

    AppDebug("Setting enabled true...");

    m_SocketNotifier->setEnabled(true);

}

------- Python Script ---------

#!/usr/bin/python3

import zmq
import random
import sys
import time
import base64

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

time.sleep(1)
while True:
    f = open("test.jpg",'rb')
    bytes = bytearray(f.read())
    print("Encoded Data Length: %s"%(len(bytes)))
    strng = base64.b64encode(bytes)
    print("Encoded Data Length: %s"%(len(strng)))
    socket.send(strng)
    f.close()
    time.sleep(1)

Answer:

You have to use a while since while reading the data you can get another data:

int SocketReceiver::events(){
    int events = 0;
    std::size_t eventsSize = sizeof(events);
    zmq_getsockopt(m_Subscriber,ZMQ_EVENTS, &events, &eventsSize);
    return events;
}

void SocketReceiver::readZMQData()
{
    m_SocketNotifier->setEnabled(false);
    while (events() & ZMQ_POLLIN) {
        zmq_msg_t message;
        zmq_msg_init(&message);
        zmq_recvmsg(m_Subscriber, &message, 0);
        size_t size = zmq_msg_size (&message);
        AppDebug(QString("Message Size: %1").arg(size));
        char *string = static_cast<char*>(malloc(size + 1));
        memcpy (string, zmq_msg_data(&message), size);
        zmq_msg_close (&message);
        string [size] = 0;
        if (string) {
            QByteArray frame = QByteArray::fromBase64(QByteArray(string));
            free(string);
            AppDebug(QString("Debug RX Frame Size: %1").arg(frame.size()));
            QFile output("/tmp/abcd.jpeg");
            if ( output.open(QIODevice::WriteOnly) ) {
                output.write(frame);
                output.close();
            }
        }
    }
    m_SocketNotifier->setEnabled(true);
}

Question:

I have a small program where I am not receiving a response from the server. This is using python 3.4 and latest zeromq and pyqt5 on Ubuntu 14.04. The client GUI sends a message to the server which gets it and responds but the client does not see the response.

Here is the code: miniServer.py:

import os
import sys
import time
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream

# Prepare for client socket
ctx = zmq.Context.instance()
sIn = ctx.socket(zmq.PULL)
urlIn = 'tcp://127.0.0.1:1234'
sIn.bind(urlIn)
# Prepare for sockets to GUI and Web
guiCalled = False
webCalled = False
urlGui = 'tcp://127.0.0.1:2345'
urlWeb = 'tcp://127.0.0.1:3456'
sGUI = None
sWeb = None

def __GetConfig__(sender, data):
    if "GUI" == sender:
        print("Sending back config list to GUI")
        sGUI.send_string("From Server to GUI")
    elif "WEB" == sender:
        sWeb.send_string("From Server to Web")
def __CheckGUICalled__():
    # Used to only connnect once
    global guiCalled
    global sGUI
    if not guiCalled:
        print("Connected to client GUI at port 2345")
        guiCalled = True
        sGUI = ctx.socket(zmq.PUSH)
        sGUI.connect(urlGui)
def __CheckWebCalled__():
    # Used to only connnect once
    global webCalled
    global sWeb
    if not webCalled:
        webCalled = True
        sWeb = ctx.socket(zmq.PUSH)
        sWeb.connect(urlWeb)

actions = {
    "GET_CONFIG": __GetConfig__}
clients = {
    "GUI": __CheckGUICalled__,
    "WEB": __CheckWebCalled__}

def check_msg(msg):
    newStr = msg[0].decode("utf-8", "strict")
    if newStr.count(":") == 2:
        [sender, command, data] = newStr.split(":")
        print("Sender: " + sender + ", Command: " + command + ", Data: " + data)
        # connect if not already done
        clients.get(sender, lambda: None)()
        # execute the command sent from client
        actions.get(command, lambda: None)(sender, data)

# register the check_msg callback to be fired
# whenever there is a message on our socket
stream = ZMQStream(sIn)
stream.on_recv(check_msg)

# Setup callback handling the XMOS
tic = time.time()
def xmos_handler():
    # just testing
    print("Loop time: %.3f" % (time.time() - tic))

pc = ioloop.PeriodicCallback(xmos_handler, 200)
pc.start()

# start the eventloop
ioloop.IOLoop.instance().start()

miniGUI.py:

import os
import sys
import zmq
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
from zmq.eventloop.zmqstream import ZMQStream

# prepare out socket to server
ctx = zmq.Context.instance()
sOut = ctx.socket(zmq.PUSH)
sOut.connect('tcp://127.0.0.1:1234')

# handle inputs from server
def check_msg(msg):
    newStr = msg[0].decode("utf-8", "strict")
    print("Message: " + newStr + " received")
sIn = ctx.socket(zmq.PULL)
sIn.bind('tcp://127.0.0.1:2345')
stream = ZMQStream(sIn)
stream.on_recv(check_msg)

# setup window
class Form(QWidget):
    def __init__(self, parent=None):
        super(Form, self).__init__(parent)
        nameLabel = QLabel("Name:")
        self.nameLine = QLineEdit()
        self.submitButton = QPushButton("Submit")
        buttonLayout1 = QVBoxLayout()
        buttonLayout1.addWidget(nameLabel)
        buttonLayout1.addWidget(self.nameLine)
        buttonLayout1.addWidget(self.submitButton)
        self.submitButton.clicked.connect(self.submitContact)
        mainLayout = QGridLayout()
        mainLayout.addLayout(buttonLayout1, 0, 1)
        self.setLayout(mainLayout)
        self.setWindowTitle("For Test: GUI:GET_CONFIG:0")

    def submitContact(self):
        name = self.nameLine.text()
        sOut.send_string(name)
        print("Message " + name + " sent")

if __name__ == '__main__':
    import sys
    app = QApplication(sys.argv)
    screen = Form()
    screen.show()
    sys.exit(app.exec_())

Start the server first in one treminal:

python3.4 miniServer.py

and then the GUI:

python3.4 miniGUI.py

Writing the string in the edit widget:

GUI:GET_CONFIG:0

and pressing the submit button will print on the server console:

Sender: GUI, Command: GET_CONFIG, Data: 0

Connected to client GUI at port 2345

Sending back config list to GUI

and on the GUI console will only

Message GUI:GET_CONFIG:0 sent

be written whereas the expected result should have been:

Message GUI:GET_CONFIG:0 sent

Message: From Server to GUI received

What am I doing wrong?


Answer:

The solution is to use socket type REQ/REP instead of PUSH/PULL which makes the code also much cleaner.

Question:

I added a new thread to my application developed using Qt through the function Start_zeroMQResponderThread:

// this function adds the new thread
void MainWindow::Start_zeroMQResponderThread()
{

    moveToThread(&zeroMQResponderthread);
    QObject::connect(&zeroMQResponderthread, SIGNAL(started()), this, SLOT(Run_zeroMQResponderThread())); //cant have parameter sorry, when using connect
    zeroMQResponderthread.start();
}

I called this function in the constructor of MainWindow to be sure that the thread is created in the start of the application:

MainWindow::MainWindow(QWidget *parent) :
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
....

    // This is the sender which do not need to be run in a separate thread
    context = zmq_ctx_new();
    requester = zmq_socket(context, ZMQ_PAIR);
    rc = zmq_connect(requester, "tcp://10.131.7.97:5555"); 
...
    //  This is the starting of the thread that will listens to the network to 
    //  capture received messages using ZeroMQ
    Start_zeroMQResponderThread();
}

And Finally, this is the function Run_zeroMQResponderThread(): It runs in the separate thread. It launches an infinite loop to detect sent messages using ZeroMQ and transforms them to vocal messages using Windows Text To Speech API (SAPI):

void MainWindow::Run_zeroMQResponderThread() {

    ISpVoice * pVoice = NULL;
    if (!FAILED(::CoInitialize(NULL)))
    {
        HRESULT hr = CoCreateInstance(CLSID_SpVoice, NULL, CLSCTX_ALL, IID_ISpVoice, (void **)&pVoice);
    }


    void *context = zmq_ctx_new();
    void *responder = zmq_socket(context, ZMQ_PAIR);
    int rc = zmq_bind(responder, "tcp://*:5555");


    printf("Receiver: Started\n");

    char buffer[128];
    wchar_t wtext[128];
    while (true)
    {
        int num = zmq_recv(responder, buffer, 128, 0);

        if (num > 0)
        {
            buffer[num] = '\0';
            printf("Receiver: Received (%s)\n", buffer);



            mbstowcs(wtext, buffer, strlen(buffer) + 1);//Plus null
            LPWSTR ptr = wtext;
            HRESULT hr;
            if (pVoice)
                hr = pVoice->Speak(ptr, SPF_DEFAULT, NULL);

            if (!SUCCEEDED(hr))
                std::cout << "speak error" << hr << std::endl;
        }
    }

    pVoice->Release();
    pVoice = NULL;
    ::CoUninitialize();

    zmq_close(responder);
    zmq_ctx_destroy(context);


}

Before adding this function, the application runs well. But after adding it freezes at the beginning of the application without even showing the main UI of the application.

What could be the problem?


Answer:

Moving a QWidget to any thread other than the main thread or null thread was not ever meant to work. Period.

You should factor out the controller aspect of your code from the UI anyway. The controller will reside in one or more QObjects. All you have to do is move those to a worker thread, and you're set.

Question:

I'm using pyzmq in my qt app.

I found some past solution in mailinglist in first link. So, here is my code with link.

import zmq
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtWidgets import QApplication, QWidget


class ChatApp(QWidget):
    def __init__(self):
        super(ChatApp, self).__init__()

        self._zmq_context = zmq.Context()
        self._zmq_sock = self._zmq_context.socket(zmq.SUB)
        self._zmq_sock.connect("tcp://localhost:5556")
        self._zmq_sock.setsockopt(zmq.SUBSCRIBE, b"bm_chat")

        self.read_noti = QSocketNotifier(self._zmq_sock.getsockopt(zmq.FD),
                                             QSocketNotifier.Read,
                                             self)
        self.read_noti.activated.connect(self.on_read_msg)

    def on_read_msg(self, _):
        self.read_noti.setEnabled(False)
        flags = self._zmq_sock.getsockopt(zmq.EVENTS)

        if flags & zmq.POLLIN:
            msg = self._zmq_sock.recv_multipart()
            topic = msg[0]
            data = msg[1]
            print(topic, data)
        elif flags & zmq.POLLOUT:
            print("[Socket] zmq.POLLOUT")
        elif flags & zmq.POLLERR:
            print("[Socket] zmq.POLLERR")
        else:
            print("[Socket] FAILURE")
        self.read_noti.setEnabled(True)

if __name__ == '__main__':
    app = QApplication([])
    win = ChatApp()
    win.show()
    app.exec_()

But, as expected after message fire once, and never happend again. This is my message

[Socket] FAILURE
b'bm_trade' b'hello0'

So I searched other solutions here, with read self._zmq_sock.getsockopt(zmq.EVENTS) after qt notification enabled. So I changed my code in last line

def on_read_msg(self, _):
    self.read_noti.setEnabled(False)
    flags = self._zmq_sock.getsockopt(zmq.EVENTS)

    if flags & zmq.POLLIN:
        msg = self._zmq_sock.recv_multipart()
        topic = msg[0]
        data = msg[1]
        print(topic, data)
    elif flags & zmq.POLLOUT:
        print("[Socket] zmq.POLLOUT")
    elif flags & zmq.POLLERR:
        print("[Socket] zmq.POLLERR")
    else:
        print("[Socket] FAILURE")
    self.read_noti.setEnabled(True)
    self._zmq_sock.getsockopt(zmq.EVENTS) // Here is fixed      

It works great, until data rate is low. Here is my PUB server code.

from time import sleep

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5557")

def ms(millisec):
    return millisec / 1000

if __name__ == '__main__':
    count = 0
    while True:
        socket.send_multipart(
            [b'bm_trade', bytes(('hello' + str(count)).encode('utf-8'))])
        count += 1

        sleep(ms(10))

That trick is only work on event time gap over 10 ms. If you change sleep(ms()) under 10, client side also get fired once, and never fire.

Someone would say it's okay, but in my app, latency is very low and all message always should visible in gui.

Why this problem happened, how to solve this?


Answer:

You just have to continue reading while there is data as I show below:

def on_read_msg(self):
    self.read_noti.setEnabled(False)

    if self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLIN:
        while self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLIN:
            topic, data = self._zmq_sock.recv_multipart()
            print(topic, data)
    elif self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLOUT:
        print("[Socket] zmq.POLLOUT")
    elif self._zmq_sock.getsockopt(zmq.EVENTS) & zmq.POLLERR:
        print("[Socket] zmq.POLLERR")

    self.read_noti.setEnabled(True)

Question:

I have a class that inherits from QThread and that handles a ZeroMQ SUB subscribe socket in a while loop ( polling in the thread's run method ) and notifies the GUI thread by emitting signals when receiving messages from the socket.

In this QThread derived class, before the polling loop, I also create a ZeroMQ PUB publish socket and there are methods to use it. These methods are only used in the main thread ( GUI ) to send data to the server.

This solution works without any problem, but it's not perfect. I want only one thread to handle both subscribe and publish sockets operations. The GUI thread ( main ) instead of calling a QThread derived class method, will send a signal to request publishing a data.

Is there a nice pattern, based on Qt tools, to implement that ?


Answer:

Is there a nice pattern, based on Qt tools, to implement that ?

Well,

ZeroMQ part has no problem to handle both PUB and SUB at once

Independently of the primary motivation, the ZeroMQ engine is not the issue here. If looking inside wires and details, ZeroMQ Context() instances are actually pools-of-threads, which provide lot of opportunities for performance tuning and respective prioritisations, mapping socket-instances onto directly-mapped groups of ZeroMQ I/O-threads.

If not working with ZeroMQ day by day, one may enjoy a 5-seconds read into the main conceptual differences, as sketched in brief in the [ ZeroMQ hierarchy in less than a five seconds ] Section.

This said, the main concern is on how one would like to require the Qt-ecosystem to "speak" through the ZeroMQ infrastructure. Given a pure non-blocking, well designed code is put into the piping, there would be no major barrier to let ZeroMQ send and collect signals across the platform.

Efficiency of message passing, (ultra)-low-latency and zero-copy mechanics are handy and available at your fingertips. This means, that bad idea or a nasty code will remain being bad or nasty, where the ZeroMQ framework cannot be blamed for "making troubles" :o)