Hot questions for Using ZeroMQ in scala

Question:

I want to know if there's a way for a ZeroMQ socket to do only reading or only writing. Because, it seems to me that, even though there are async/multithreading examples, every thread still uses recv-then-send loop. The problem I have is, I want to have receiveMessage() that reads from ZeroMQ socket and sendMessage(msg) that writes to ZeroMQ socket. But each of those methods would run in separate thread that is constructed IN ANOTHER class. Here's my code (I'm using jeromq from Scala):

trait ZmqProtocol extends Protocol {

  val context: ZContext = new ZContext(1)
  private val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
  private val backendSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)

  frontendSocket.bind("tcp://*:5555")
  backendSocket.bind("inproc://backend")


  new Thread(() => {

    println("Started receiving messages")
    // Connect backend to frontend via a proxy
    ZMQ.proxy(frontendSocket, backendSocket, null)

  }).start()


  override def receiveMessage(): (String, String) = {

    val inprocReadSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
    inprocReadSocket.connect("inproc://backend")

    //  The DEALER socket gives us the address envelope and message
    val msg = ZMsg.recvMsg(inprocReadSocket)

    // Message from client's REQ socket contains 3 frames: address + empty frame + request content
    // (payload)
    val address = msg.pop
    val emptyFrame = msg.pop
    val request = msg.pop

    assert(request != null)
    msg.destroy()

    println(s"RECEIVED: $request FROM: $address")

    (address.toString, request.toString)
  }

  override def sendMessage(address: String, response: String): Unit = {

    val inprocWriteSocket: ZMQ.Socket = context.createSocket(ZMQ.DEALER)
    inprocWriteSocket.connect("inproc://backend")

    val addressFrame = new ZFrame(address)
    val emptyFrame = new ZFrame("")
    val responseFrame = new ZFrame(response)

    addressFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
    // Sending empty frame because client expects such constructed message
    emptyFrame.send(inprocWriteSocket, ZFrame.REUSE + ZFrame.MORE)
    responseFrame.send(inprocWriteSocket, ZFrame.REUSE)

    addressFrame.destroy()
    emptyFrame.destroy()
    responseFrame.destroy()
  }

}

And here's how I would use it:

class TrafficHandler(val requestQueue: LinkedBlockingQueue[(String, Message)],
                     val responseQueue: LinkedBlockingQueue[(String, String)])
  extends Protocol {

def startHandlingTraffic(): Unit = {

    new Thread(() => {

      while (true) {

        val (address, message) = receiveMessage()

        requestQueue.put((address, message))
      }
    }).start()

    new Thread(() => {
      while (true) {

        val (address, response) = responseQueue.take()
        sendMessage(address, response)
      }
    }).start()
  }

During debugging, I've noticed I received message, correctly took it from response queue (concurrent blocking queue) with the correct destination address, but silently failed to send it. I've dived a bit in a jeromq code and it seems to me it has something to do with identity because outPipe is null. I'm guessing it's because I don't have correct recv-send loop.

EDIT AFTER @user3666197 response The code works! (although if you starting server first, it takes time to bind and connect to PUSH and PULL sockets) Here is modified code that uses PUSH and PULL sockets:

trait ZmqProtocol extends Protocol {

  val context: ZContext = new ZContext(1)

  val frontendSocket: ZMQ.Socket = context.createSocket(ZMQ.ROUTER)
  frontendSocket.bind("tcp://*:5555")

  val requestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
  requestQueueSocket.bind("inproc://requestQueueSocket")

  val responseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
  responseQueueSocket.bind("inproc://responseQueueSocket")

  val inprocRequestQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PULL)
  inprocRequestQueueSocket.connect("inproc://requestQueueSocket")

  val inprocResponseQueueSocket: ZMQ.Socket = context.createSocket(ZMQ.PUSH)
  inprocResponseQueueSocket.connect("inproc://responseQueueSocket")

  new Thread(() => {

    println("Started receiving messages")

    while (true) {

      val msg = ZMsg.recvMsg(frontendSocket)

      // Message from client's REQ socket contains 3 frames: address + empty frame + request content
      // (payload)
      val reqAddress = msg.pop
      val emptyFrame = msg.pop
      val reqPayload = msg.pop

      assert(reqPayload != null)
      msg.destroy()

      println(s"RECEIVED: $reqPayload FROM: $reqAddress")

      requestQueueSocket.send(s"$reqAddress;$reqPayload")

      val responseMessage = new String(responseQueueSocket.recv(0))
      val respMessageSplit = responseMessage.split(";")

      val respAddress = respMessageSplit(0)
      val respPayload = respMessageSplit(1)

      val array = new BigInteger(respAddress, 16).toByteArray
      val respAddressFrame = new ZFrame(array)
      val respEmptyFrame = new ZFrame("")
      val respPayloadFrame = new ZFrame(respPayload)

      respAddressFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
      // Sending empty frame because client expects such constructed message
      respEmptyFrame.send(frontendSocket, ZFrame.REUSE + ZFrame.MORE)
      respPayloadFrame.send(frontendSocket, ZFrame.REUSE)

      respAddressFrame.destroy()
      respEmptyFrame.destroy()
      respPayloadFrame.destroy()

    }

  }).start()


  override def receiveMessage(): (String, String) = {

    val message = new String(inprocRequestQueueSocket.recv(0))
    val messageSplit = message.split(";")

    val address = messageSplit(0)
    val payload = messageSplit(1)

    (address, payload)
  }

  override def sendMessage(address: String, response: String): Unit = {

    inprocResponseQueueSocket.send(s"$address;$response")
  }
}

Here is the client if needed:

trait ZmqClientProtocol extends ClientProtocol {

  val context: ZMQ.Context = ZMQ.context(1)
  val socket: ZMQ.Socket = context.socket(ZMQ.REQ)

  println("Connecting to server")
  socket.connect("tcp://localhost:5555")

  override protected def send(message: String): String = {

    //  Ensure that the last byte of message is 0 because server is expecting a 0-terminated string
    val request = message.getBytes()

    // Send the message
    println(s"Sending request $request")
    socket.send(request, 0)

    //  Get the reply.
    val reply = socket.recv(0)

    new String(s"$message=${new String(reply)}")
  }
}

Answer:

Is there a way for a ZeroMQ socket to do only reading or only writing?

Yes, several ways.

a ) use a tandem of simplex archetypes: PUSH/PULL writes and PULL/PUSH reads b ) use a tandem of simplex archetypes: (X)PUB/(X)SUB writes and (X)SUB/(X)PUB reads


... still uses .recv()-then-.send() loop.

Well, this observation is related more to the actual socket-archetype, some of which indeed require a mandatory two-step ( hardwired inside their internal FSA-s ) sequencing of .recv()--.send()--...


... but each of those methods would run in separate thread

Well, here the challenge starts: ZeroMQ was since its initiation designed as principally zero-sharing so as to foster performance and independence. Zen-of-Zero is interesting design principle in distributed-system design.

Yet, recent re-design efforts have presented in API 4.2+ a will to achieve ZeroMQ socket Access-points to become thread-safe ( which goes against the initial principle of share-nothing ), so if going to experiment in this direction, your may arrive in territories, that work, but at a cost of decline from Zen-of-Zero.

ZeroMQ Socket Access-point(s) should never be shared, even if possible, because of design purity.

Better equip such class with another pair of simplex PUSH/PULL-s, if you strive for separation of OOP-concerns, but your head-end(s) of such read-only-specialised + write-only-specialised sockets will have to handle the cases, when a "remote" ( beyond the foreign class-boundary of abstraction ) ZeroMQ Socket-archetype FSA and it's settings and performance tweaking and error-state(s) and the "remote" class will have to arrange all such plus mediate all message-transfers to/from the native ZeroMQ-socket ( which is principally isolated and hidden for both of the head-end ( specialised ) classes ).

In any case, doable with due design care.


ZeroMQ resources are not any cheaply composable / disposable trash

An idea of:

...
override def sendMessage( address:  String,
                          response: String
                          ): Unit = {

             val inprocWriteSocket: ZMQ.Socket  = context.createSocket( ZMQ.DEALER )
                 inprocWriteSocket.connect( "inproc://backend" )
                 ...

may seem easy in the source code, but ignores the actual setup overheads and shall also respect the fact, that no socket ( inproc://-transport-class being a special case ) gets RTO ( Ready-To-Operate ) in the very microsecond it was instantiated inside the Context(), the less being a fully .connect()-ed and RTO-ed after all handshaking with the remote counterparty, so best setup the SIG/MSG-infrastructure well beforehand and keep it best as a semi-persistent communication layer, rather than any ad-hoc / just-in-time initiated composable/disposable... ( Ecology of Resources )


inproc://-transport-class has one more requirement pre-API 4.x:

Connecting a socket When connecting a socket to a peer address using zmq_connect() with the inproc:// transport, the endpoint shall be interpreted as an arbitrary string identifying the name to connect to. Before version 4.0 the name must have been previously created by assigning it to at least one socket within the same ØMQ context as the socket being connected. Since version 4.0 the order of zmq_bind() and zmq_connect() does not matter just like for the tcp:// transport type.

So in cases, when your deployment is unsure about the actual localhost API version, beware enforcing the proper order of .bind() / .connect(), otherwise teh inproc:// pipes will not work for you.

Question:

I'm using the org.zeromq java library for ZMQ and it works great in production deployment and when I run tests inside of IntelliJ, but if I try to run the same test via sbt from the command line on the same machine it fails with:

java.lang.AbstractMethodError: com.sun.jna.Structure.getFieldOrder()Ljava/util/List;
    at com.sun.jna.Structure.fieldOrder(Structure.java:868) ~[jna-4.0.0.jar:4.0.0 (b2)]
    ... (8 lines omitted)
    at org.zeromq.zmq_msg_t.<init>(zmq_msg_t.java:21) ~[zeromq-scala-binding_2.10-0.0.7.jar:0.0.7]
    at org.zeromq.ZMQ$Socket.newZmqMessage(ZMQ.java:453) ~[zeromq-scala-binding_2.10-0.0.7.jar:0.0.7]
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:368) ~[zeromq-scala-binding_2.10-0.0.7.jar:0.0.7]

where the test is simply this:

class ZeroMQSpec extends FlatSpec with Matchers with LazyLogging {

  "zeroMQ" should "be able to just send something" in {
    val context = ZMQ.context(1)
    val socket = context.socket(ZMQ.PAIR)
    socket.bind("inproc://zmqtest")
    logger.debug("start polling")
    try {
      socket.send(Array[Byte](1, 2), 0)
      logger.debug("done polling")
    } catch {
      case e: Throwable => logger.error("ZMQ failed", e)
    }
  }

send is just an example. Same happens for recv or poll.

Since the same test passes when run under IntelliJ on the same machine and ZMQ also works in production code, it doesn't seem to be related to the native ZMQ installed, but rather how JVM binds to it. Unfortunately, I don't know how to further troubleshoot this.

Update

Prompted by @cdshines question, I looked at what JNA needs in the classpath to work and going by this, I need platform.jar and jna.jar. In IntelliJ they both from come my ivy cache:

file:/Users/arne/.ivy2/cache/net.java.dev.jna/jna/jars/jna-3.0.9.jar
file:/Users/arne/.ivy2/cache/net.java.dev.jna/jna/jars/jna-4.0.0.jar
file:/Users/arne/.ivy2/cache/net.java.dev.jna/platform/jars/platform-3.4.0.jar

which I figure SBT should resolve as well. Since it is not, is there a way to force SBT to do this?

Update 2

Added

"net.java.dev.jna" % "jna" % "4.0.0" % "test",
"net.java.dev.jna" % "jna-platform" % "4.0.0" % "test",

to my dependencies. No change in behavior


Answer:

Your version of zeromq-scala-binding depends on JNA version 3.0.9, but you are running it with JNA version 4.0.0 which is not binary compatible.

You need to exclude JNA 4 from whichever other dependency pulls it in. You can use sbt-dependency-graph to find out which one.

Normally I'd be scared about breaking the other dependency by forcing it to use JNA 3, but you say it works in production so you should be fine.

Question:

As a follow-up to ROUTER to DEALER, I have a single DEALER's talking to a single ROUTER:

In short, I create 2 threads for each DEALER. The life-cycle of the DEALER is:

  1. send READY to ROUTER to indicate that ROUTER can trust KILL signals from it
  2. wait for ACK from ROUTER
  3. send 5 messages
  4. send END message so that ROUTER will exit

    package net.async

    import org.zeromq.ZMQ
    import org.zeromq.ZMQ.Socket
    
    import scala.annotation.tailrec
    
    object Client {
      val Empty           = "".getBytes
      def message(x: Int) = s"HELLO_#$x".getBytes
      val Count   = 5
    }
    
    class Client(name: String) extends Runnable {
    
      import Client._
      import AsyncClientServer._
    
      override def run(): Unit = {
        val context = ZMQ.context(1)
        val dealer = context.socket(ZMQ.DEALER)
        dealer.setIdentity(name.getBytes)
        dealer.connect(s"tcp://localhost:$Port")
        initiate(dealer)
      }
    
      private def initiate(dealer: Socket): Unit = {
        dealer.send("".getBytes, ZMQ.SNDMORE)
        dealer.send("READY".getBytes, 0)
        val reply = new String(dealer.recv(0))
        println(s"DEALER: ${new String(dealer.getIdentity)} received $reply")
        if(reply == Ack) {println("DEALER: received ACK!"); runHelper(dealer, Count)}
        else              initiate(dealer)
      }
    
      @tailrec
      private def runHelper(dealer: Socket, count: Int): Unit = {
        val msg = if(count <= 0 ) End.getBytes else message(count)
        dealer.send(msg, 0)
        val id = new String(dealer.getIdentity)
        println(s"DEALER ${id} sent message: ${new String(msg)}.")
        // println(s"Dealer: ${dealer.getIdentity} received message: " + new String(dealer.recv(0)))
        runHelper(dealer, count - 1)
      }
    }
    
    object AsyncClientServer {
    
      val End = "END"
      val Ack = "WORLD"
      val Port = 5555
      val ClientReady = "READY"
      val Empty = "".getBytes
    
      val context = ZMQ.context(1)
      val router  = context.socket(ZMQ.ROUTER)
    
      def main(args: Array[String]): Unit = {
        router.bind(s"tcp://*:$Port")
        new Thread(new Client("JOE")).start()
        //new Thread(new Client("JILL")).start()
        mainHelper(List.empty)
      }
    
      private def mainHelper(activeClients: List[String]): Unit = {
        val identity = new String( router.recv(0) )
        println(s"ROUTER: Received message from $identity.")
        val empty   = router.recv(0)
        println("ROUTER: received empty: " + new String(empty))
        val message = new String( router.recv(0) )
        println(s"ROUTER: received message: $message")
    
        checkMessage(identity, message, activeClients) match {
          case Normal(msg)     => mainHelper(activeClients)
          case Ready(id)       => ackDealer(router, id); mainHelper(id :: activeClients)
          case Kill            => sys.exit(0)
          case UnknownIdentity => mainHelper(activeClients)
        }
      }
    
      private def ackDealer(router: Socket, identity: String): Unit = {
        router.send(identity.getBytes, ZMQ.SNDMORE)
        router.send(Empty,             ZMQ.SNDMORE)
        router.send(Ack.getBytes,      0)
      }
    
      private def checkMessage(identity: String, message: String, activeClients: List[String]): Message = {
        if(message == ClientReady) Ready(identity)
        else {
          activeClients.find(_ == identity) match {
            case Some(_) =>
              if (message == End) Kill
              else                Normal(message)
            case None    =>       UnknownIdentity
          }
        }
      }
      sealed trait Message
      case class Normal(value: String) extends Message
      case class Ready(id: String)     extends Message
      case object Kill                 extends Message
      case object UnknownIdentity extends Message
    
    }
    

However, it appears that the ROUTER is receiving messages out of order:

[info] Running net.async.AsyncClientServer
[info] ROUTER: Received message from JOE.
[info] ROUTER: received empty:
[info] ROUTER: received message: READY
[info] DEALER: JOE received
[info] DEALER: JOE received WORLD
[info] DEALER: received ACK!
[info] ROUTER: Received message from JOE.
[info] ROUTER: received empty:
[info] ROUTER: received message: READY
[info] DEALER JOE sent message: HELLO_#5.
[info] DEALER JOE sent message: HELLO_#4.
[info] DEALER JOE sent message: HELLO_#3.
[info] DEALER JOE sent message: HELLO_#2.
[info] ROUTER: Received message from JOE.
[info] DEALER JOE sent message: HELLO_#1.
[info] ROUTER: received empty: HELLO_#5

The last [info] shows that ROUTER received HELLO_#5 for what should've been an empty. Why is that?


Answer:

In initiate you correctly send an empty frame then the content

dealer.send("".getBytes, ZMQ.SNDMORE)
dealer.send("READY".getBytes, 0)

However in runHelper you only send the content

dealer.send(msg, 0)

Question:

Looking at the Scala code for this 5 REQ <--> 1 ROUTER setup:

Worker
 class WorkerTask extends Runnable {
    override def run: Unit = {
      val rand = new Random(System.currentTimeMillis())
      val context = ZMQ.context(1)
      val worker = context.socket(ZMQ.REQ)
      worker.connect("tcp://localhost:5555")
      var total = 0
      var workload = ""

      do {
        worker.send("Ready".getBytes, 0)
        workload = new String(worker.recv(0))
        Thread.sleep (rand.nextInt(1) * 1000)
        total += 1
      } while (workload.equalsIgnoreCase("END") == false)
      printf("Completed: %d tasks\n", total)
    }
  }
main (Router)
 def main(args: Array[String]): Unit = {
    val NBR_WORKERS = 5
    val context = ZMQ.context(1)
    val client = context.socket(ZMQ.ROUTER)

    assert(client.getType > -1)
    client.bind("tcp://*:5555")
    val workers = List.fill(NBR_WORKERS)(new Thread(new WorkerTask))
    workers.foreach (_.start)

    for (i <- 1 to (NBR_WORKERS * 10)) {
      // LRU worker is next waiting in queue
      val address = client.recv(0)
      val empty = client.recv(0)
      val ready = client.recv(0)

      client.send(address, ZMQ.SNDMORE)
      client.send("".getBytes, ZMQ.SNDMORE)
      client.send("This is the workload".getBytes, 0)
    }

    for (i <- 1 to NBR_WORKERS) {
      val address = client.recv(0)
      val empty = client.recv(0)
      val ready = client.recv(0)

      client.send(address, ZMQ.SNDMORE)
      client.send("".getBytes, ZMQ.SNDMORE)
      client.send("END".getBytes, 0)
    }
  }

Running on my machine:

[info] Running net.server.RouterToReq
Completed: 21 tasks
Completed: 1 tasks
Completed: 27 tasks
Completed: 5 tasks
Completed: 1 tasks

As I partially understand the above code, 5 REQ workers accept requests from the ROUTER on port 5555.

Lastly, in the following code:

 for (i <- 1 to NBR_WORKERS) {
      val address = client.recv(0)
      val empty   = client.recv(0)
      val ready   = client.recv(0)

What message is the client, i.e. ROUTER, receiving?


Answer:

The worker sends the message "Ready" repeatedly as a single frame message.

The REQ socket will add a blank delimiter frame on the front. The ROUTER socket will prepend an identifier frame to the front of any received message to identify where it came from.

Thus the single ready message becomes a 3 frame message when you receive it on the router, this is why 3 recv calls are required.

When you send on a router socket the first frame will be removed and used to identify which client to send the message to. The REQ socket will remove all frames until it finds an empty frame and thus you only need a single recv call on the worker side.

Question:

I'm trying to embed ZMQ subscriber in a Runnable. I'm able to start the Runnable for the first time and everything seems okay. The problem is when I interrupt the Thread and try to start a new Thread, the subscriber does not get any messages. For example:

  1. I have a publisher runnable

    class ZMQPublisherRunnable() extends Runnable {
    
     override def run() {
       val ZMQcontext = ZMQ.context(1)
       val publisher = ZMQcontext.socket(ZMQ.PUB)
       var count = 0
    
       publisher.connect(s"tcp://127.0.0.1:16666")
    
       while (!Thread.currentThread().isInterrupted) {
         try {
           println(s"PUBLISHER -> $count")
           publisher.send(s"PUBLISHER -> $count")
           count += 1
           Thread.sleep(1000)
         }
         catch {
           case e: Exception =>
           println(e.getMessage)
           publisher.disconnect(s"tcp://127.0.0.1:16666")
           ZMQcontext.close()
         }
       }
     }
    }
    
  2. I have a Subscriber Runnable:

    class ZMQSubscriberRunnable1() extends Runnable {
    
      override def run() {
    
        println("STARTING SUBSCRIBER")
    
        val ZMQcontext = ZMQ.context(1)
        val subscriber = ZMQcontext.socket(ZMQ.SUB)
        subscriber.subscribe("".getBytes)
    
       subscriber.bind(s"tcp://127.0.0.1:16666")
    
        while (!Thread.currentThread().isInterrupted) {
          try {
            println("waiting")
            val mesg = new String(subscriber.recv(0))
            println(s"SUBSCRIBER -> $mesg")
          }
          catch {
            case e: Exception =>
              println(e.getMessage)
              subscriber.unbind("tcp://127.0.0.1:16666")
              subscriber.close()
              ZMQcontext.close()
          }
        }
      }
    }
    
  3. My main code looks like this:

    object Application extends App {
      val zmqPUB = new ZMQPublisherRunnable
      val zmqThreadPUB = new Thread(zmqPUB, "MY_PUB")
    
      zmqThreadPUB.setDaemon(true)
      zmqThreadPUB.start()
    
      val zmqRunnable = new ZMQSubscriberRunnable1
      val zmqThread = new Thread(zmqRunnable, "MY_TEST")
    
      zmqThread.setDaemon(true)
      zmqThread.start()
    
      Thread.sleep(10000)
    
      zmqThread.interrupt()
      zmqThread.join()
    
      Thread.sleep(2000)
    
      val zmqRunnable_2 = new ZMQSubscriberRunnable1
      val zmqThread_2 = new Thread(zmqRunnable_2, "MY_TEST_2")
    
      zmqThread_2.setDaemon(true)
      zmqThread_2.start()
    
      Thread.sleep(10000)
    
      zmqThread_2.interrupt()
      zmqThread_2.join()
    }
    

The first time I start the Subscriber, I'm able to receive all messages:

STARTING SUBSCRIBER
PUBLISHER -> 0
waiting
PUBLISHER -> 1
SUBSCRIBER -> PUBLISHER -> 1
waiting
PUBLISHER -> 2
SUBSCRIBER -> PUBLISHER -> 2
waiting
PUBLISHER -> 3
SUBSCRIBER -> PUBLISHER -> 3
waiting
...

Once I interrupt the Thread and start a new one from the same Runnable, I'm not able to read messages anymore. It is waiting forever

STARTING SUBSCRIBER
waiting
PUBLISHER -> 13
PUBLISHER -> 14
PUBLISHER -> 15
PUBLISHER -> 16
PUBLISHER -> 17
...

Any insights about what I'm doing wrong?

Thanks


Answer:

JeroMQ is not Thread.interrupt safe.

To work around it you have to stop the ZMQContext before you call the Thread.interrupt

  1. Instantiate the ZMQContext outside the Runnable
  2. Pass the ZMQContext as an argument to the ZMQ Runnable (You can also use it is a global variable)
  3. Call zmqContext.term()
  4. Call zmqSubThread.interrupt()
  5. Call zmqSubThread.join()

For more details take a look at: https://github.com/zeromq/jeromq/issues/116

My subscriber Runnable looks like:

class ZMQSubscriberRunnable(zmqContext:ZMQ.Context, port: Int, ip: String, topic: String) extends Runnable {

  override def run() {

    var contextTerminated = false
    val subscriber = zmqContext.socket(ZMQ.SUB)
    subscriber.subscribe(topic.getBytes)

    subscriber.bind(s"tcp://$ip:$port")

    while (!contextTerminated && !Thread.currentThread().isInterrupted) {
      try {
        println(new String(subscriber.recv(0)))
      }
      catch {
        case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode =>
          contextTerminated = true
          subscriber.close()
        case e: Exception =>
          zmqContext.term()
          subscriber.close()
      }
    }
  }
}

To interrupt the Thread:

zmqContext.term()
zmqSubThread.interrupt()
zmqSubThread.join()