Hot questions for Using ZeroMQ in apache

Question:

I am using Spark-streaming to receive data from a zero MQ Queue at an specific interval , enrich it and save it as parquet files . I want to compare data from one streaming window to another.(later in time using parquet files)

How can I find the timestamps a specific streaming window , which I can add as another filed while enrichment to facilitate my comparisons.

JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(duration));
inputStream = javaStreamingContext.receiverStream(new StreamReceiver( hostName, port, StorageLevel.MEMORY_AND_DISK_SER()));
JavaDStream<myPojoFormat> enrichedData = inputStream.map(new Enricher());

In a nutshell I want time stamp of each streaming window .( Not record level but batch level)


Answer:

You can use the transform method of JavaDStream which gets a Function2 s parameter. The Function2 gets a RDD and a Time object and returns a new RDD. The overall result will be a new JavaDStream in which RDD have been trasformed accord the logic you have chosen.

Question:

First of all, I checked all related questions, but in my case it must be something else (I tried the solutions from there);

I installed zeromq following these instructions http://zeromq.org/bindings:php and everything works just fine when I run my php script from CLI

Problem is running from apache server, I get the

Fatal error: Class 'ZMQContext' not found in /var/www/i.php on line 19

line 19 is

$context = new ZMQContext();

What I tried:

1)I ran the php_info(), I found out where my php.ini files are

 /etc/php5/apache2/php.ini
 /etc/php5/apache2/conf.d/10-php_pdo_mysql.ini, 
 /etc/php5/apache2/conf.d/10-zmq.ini,
 and so on...

So I opened these files and add line "extension=zmq.so"

Did not help.

2)I added following lines to the beginning of my script:

use \ZMQContext;
use \ZMQ;

Again, did not help

3) I checked the apache error logs and find this:

PHP Warning:  PHP Startup: Unable to load dynamic library '/usr/lib/php5/20090626
/zmq.so' - /usr/lib/php5/20090626/zmq.so:
 cannot open shared object file: No such file or directory in Unknown on line 0

PHP Warning:  PHP Startup: Unable to load dynamic library '/usr/lib/php5/20090626
/zmq.so' - /usr/lib/php5/20090626/zmq.so: cannot open shared object file: No such 
file or directory in Unknown on line 0

4) So I checked, where the zmq.so actually is on my machine:

user@wb:~$ sudo find / -name zmq.so
/home/user/php-zmq/modules/zmq.so
/home/user/php-zmq/.libs/zmq.so
/var/www/push/php-zmq/modules/zmq.so
/var/www/push/php-zmq/.libs/zmq.so
/usr/lib/php5/20100525/zmq.so
/usr/local/lib/php/extensions/no-debug-non-zts-20121212/zmq.so

So do you have any suggestion what else might be wrong? Thank you


Answer:

I have to take a few guesses, but let's give it a try:

You appear to have different versions of PHP installed, or you have updated/downgraded your PHP.

You have the folder /usr/lib/php5/20100525/ on your system which belongs to PHP 5.4 on a Debian-type machine (I think). But your PHP looks for the file in /usr/lib/php5/20090626/, which I think belongs to PHP 5.3.

Your PHP CLI seems to run with PHP 5.4, while the mod_php of your Apache appears to run 5.3.

If this is true, you can try copying the zmq.so into /usr/lib/php5/20090626/zmq.so/ and restart Apache. But it could fail due to binary incompatibility, if it was built against PHP 5.4.

In this case, I'd recommend to upgrade your mod_php to PHP 5.4 (again?).

Question:

I tried to install ZMQ for PHP recently and ran into the following problem:

apache2: Syntax error on line 137 of /etc/apache2/httpd.conf: Cannot load modules/extra_zmq.so into server: /usr/lib64/apache2/modules/extra_zmq.so: undefined symbol: zend_new_interned_string

make test on php-zmq shows my PHP version is used (7.1.11, along with Zend 3.1.0) and fail for these two tests (don't know if it's related, I couldn't find more information about these tests):

FAILED TEST SUMMARY
---------------------------------------------------------------------
Test adding / removing items [tests/007-addremovepoll.phpt]
Test callback edge-cases [tests/019-callbackinvalidsignature.phpt]

I tried downloading from both Github and PECL repository without any luck.


Answer:

Well I found the problem, the extension was meant to be loaded by PHP and not by Apache. I was trying to use LoadModule (a2enmod) in httpd.conf, but this extension needs to be used with PHP, with the proper directive: extension=zmq.so.

Question:

We're evaluating wether to use Spark to run our grid calculations and we're having some trouble with a particular use case. Wondering if the community has any good ideas.

We have an extensive library of C++ functions in a dll that we need to use all over the grid. The C++ code base is large, not particularly stable (it falls over frequently) and holds state (thread un-safe). For this reason the dll needs to live out of process.

To handle this we've built a thin zero mq wrapper around the dll and a thin scala zero mq client to handle requests to the dll. We can distribute the zeromq wrapped dll to all the nodes in the grid.

My question is: Is it possible to use Spark to handle this situation? How can I make calls to the zeromq scala client inside an RDD.map() call? How can this get handled on the grid? Do we need to serialise the zeromq client?

Any ideas much appreciated :)


Answer:

You probably want to run an instance of your custom server on each node. Then you can create a new client per partition. Instead of RDD.map you would use RDD.mapPartitions, something like this:

rdd.mapPartitions { it =>
  val client = new MyClient("localhost:2345")
  it.map(x => client.calculate(x))
}

Question:

I'm testing ZeroMQ for PHP. My goal is to send messages to a Python script. Everything works fine if i launch my transmission script from PHP cli

php /path/to/myscript.php

while it fails if it's a web request. I've tried executing the server script from PHP cli as above (which seems to be the more logical way) and with a web request.

I've got a Centos 7 server with PHP 7.2 and ZeroMQ 1.1.3 installed through PECL install.

I even tried launching the above command with shell_exec/exec inside the client script but it doesn't work. Connection works fine, but it doesn't send nor receive.

Client code:

$context = new ZMQContext();

//  Socket to talk to server
echo "Connecting to hello world server...\n";
$requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
$currentObject = $requester->connect("tcp://localhost:5555");


for ($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
    printf ("Sending request %d...\n", $request_nbr);
    $risSend = $requester->send("Hello", ZMQ::MODE_NOBLOCK);
    print_r($risSend);
    $reply = $requester->recv();
    printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
}

Server Code:

$context = new ZMQContext(1);

//  Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");

while (true) {
    //  Wait for next request from client
    $request = $responder->recv();

    printf ("Received request: [%s]\n", $request);

    //  Send reply back to client
    $responder->send("World");
}

The browser gets stuck, without any error. Even using a timeout it reaches the limit and exits but I can't get any error message.


Answer:

OBSERVATION : The browser gets stuck, without any error.

This is pretty legal state. For it to happen, it is quite enough to "miss" the arrival of the first REQ-side-already dispatched request and due to a pleasure do depend on a distributed-Finite-State-Automaton, we fall into an unsalvageable dead-lock, where the REQ-side waits for an answer, that will never arrive (see next) and the REP-side waits for a request, that will never arrive (see the REQ-side already waiting ) and such a state remains forever that.


A best next step:

In case one has never worked with ZeroMQ, or have never met the concept of the art of Zen-of-Zero, one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds" before diving into further details


Start with unconditionally working archetypes - a pair of PUSH / PULL simplex-channels, that do not require a dFSA-two-step of REQ-REP-REQ-REP-REQ-REP-...-{deadlock} ... a principally unavoidable terminal state, about which one is just never sure when it happens, but it will ... at some later time :o)

Next, may increase a robustness of the message-flow, using zmq_setsockopt( ZMQ_IMMEDIATE, 1 ) that avoids moving messages onto incomplete connections between / among peers.

Always prefer non-blocking forms of .recv()-methods, best with a pre-test of a message-presence with a .poll()-method. Poller-class, while available in many language-bindings is not always as handy and as flexible as using explicit .poll()-method directly on a Socket-instance.

Also feel free to read more about fine-tuning the ZeroMQ tools and other implications of the Art of the Zen-of-Zero here.


A Server-side mock-up: As a { PASS | FAIL }-proof of .send()---.recv()-delivery chain works?

<?php                                      /* Create new PUSH-ing end */
$aCTX   = new ZMQContext();
try {                                      /* Try: things may turn wreck havoc */

      $PUSHer = $aCTX->getSocket(, ZMQ::SOCKET_PUSH );
      echo "POSACK'd: .getSocket() was made\n";
      }
catch ( ZMQSocketException $e ){
      echo "  NACK'd: I told you ...\n";   /* Handle with care ... */
      if ( $e->getCode() === ZMQ::ERR_... ) {
            echo " - Got ERR_..., read ZeroMQ API documentation for details\n";
        } else {
            die( " - Get ERR: " . $e->getMessage() );
        }
      }
try {                                      /* Try: things may turn wreck havoc */
      $PUSHer->bind( "tcp://A.B.C.D:NNN" ); /* IP address to .connect() */
      echo "POSACK'd: .bind() was made\n";
      }
catch ( ZMQSocketException $e ){
      echo "  NACK'd: I told you ...\n";   /* Handle with care ... */
      if ( $e->getCode() === ZMQ::ERR_... ) {
            echo " - Got ERR_..., read ZeroMQ API documentation for details\n";
        } else {
            die( " - Get ERR: " . $e->getMessage() );
        }
      }

$retries = 1234567;

do {                                       /* Start a loop */
    try {                                  /* Try: to PUSH.send() */
            echo "Trying to send a message #" . ( 1234568 - $retries ) . "\n";
            $PUSHer->send( "This is a message", ZMQ::MODE_DONTWAIT );
            echo "POSACK'd: PUSHer.send() was made\n";
        }
    } catch ( ZMQSocketException $e ) {
        echo "  NACK'd: I told you ...\n"; /* Handle with care ... */
        if ( $e->getCode() === ZMQ::ERR_... ) {
            echo " - Got ERR_..., read ZeroMQ API documentation for details\n";
        } else {                           /* For all ERR_... states */
            die( " - Got ERR_...: " . $e->getMessage() );
        }
    }
 /* --------------------------------------------------------------------
    Here one may add an attempt to .recv( $PULLer, ZMQ::MODE_DONTWAIT );
             and test for a non-empty string returned
    -------------------------------------------------------------------- */
    usleep( 1 );                           /* Sleep a bit between operations */
} while ( --$retries );
?>

Client-side mock-up, to test the PUSH-er lives and .send()-s

import time, datetime, zmq; print( "Thissssss Sssssssssssssssssssssssssssssssssssssssnake uses ZeroMQ ver:{0:}".format( zmq.__version__ ) )

aCtx = zmq.Context()
aPull= aCtx.Socket( zmq.PULL )
aPull.setsockopt(   zmq.LINGER, 0 )         # always ... be explicit
aPull_address2c = "tcp://A.B.C.D:NNN"

M0 = "{0:} try a .connect( {1:} ), if it gets to PUSH-er side"
M1 = "{0:} try a .recv(), if it gets any message"
M2 = "{0:} got a .recv()->[[[ {1:} ]]]"
M3 = "{0:} EXC'd           will gracefully release resources and terminate..."
M4 = "{0:} did"

try:
    print( M0.format( datetime.datetime.isoformat( datetime.datetime.now() ),
                      aPull_address2c
                      )
           )
    aPull.connect( aPull_address2c );

    while True:
        print( M1.format( datetime.datetime.isoformat( datetime.datetime.now() ) )
        m = aPull.recv( zmq.NOBLOCK )       # always ... avoid blocking waits
        if ( len( m ) > 0 ):
             print( M2.format( datetime.datetime.isoformat( datetime.datetime.now() ),
                               str( m )     # always ... fused to str()
                               )
                    )
             time.sleep( 5 )
        else:
             time.sleep( 1 )

        pass

        ################################################################
        # Here one may add an attempt to aPush.send( M4, zmq.NOBLOCK )
        #          and test if the reverse path aPush->$PULLer goes well
        ################################################################

except:
    print( M3.format( datetime.datetime.isoformat( datetime.datetime.now() ) )

finally:
    aPull.close()                           # always ... be explicit
    aCtx.term()                             # always ... be explicit

    print( M4.format( datetime.datetime.isoformat( datetime.datetime.now() ) )

Question:

I have a basic Maven java app that I created and it depends on JeroMQ which is a full Java implemenetation of ZeroMQ. Since I also need to wrap this java app as a windows service, I chose to use Apache Commons Daemon and specifically, followed this excellent example: http://web.archive.org/web/20090228071059/http://blog.platinumsolutions.com/node/234 Here's what the Java code looks like:

package com.org.SubscriberACD;

import java.nio.charset.Charset;

import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

/**
 * JeroMQ Subscriber for Apache Commons Daemon
 *
 */
public class Subscriber 
{
    /**
    * Single static instance of the service class
    */
    private static Subscriber subscriber_service = new Subscriber();

    /**
     * Static method called by prunsrv to start/stop
     * the service.  Pass the argument "start"
     * to start the service, and pass "stop" to
     * stop the service.
     */
    public static void windowsService(String args[]) {
       String cmd = "start";
       if(args.length > 0) {
          cmd = args[0];
       }

       if("start".equals(cmd)) {
          subscriber_service.start();
       }
       else {
          subscriber_service.stop();
       }
    }

    /**
     * Flag to know if this service
     * instance has been stopped.
     */
    private boolean stopped = false;


    /**
     * Start this service instance
     */
    public void start() {

       stopped = false;

       System.out.println("My Service Started "
                          + new java.util.Date());

       ZContext context = new ZContext();

       Socket subscriber = context.createSocket(ZMQ.SUB);
       subscriber.connect("tcp://localhost:5556");
       String subscription = "MySub";
       subscriber.subscribe(subscription.getBytes(Charset.forName("UTF-8")));

       while(!stopped) {
          System.out.println("My Service Executing "
                              + new java.util.Date());

          String topic = subscriber.recvStr();
          if (topic == null)
              break;
          String data = subscriber.recvStr();
          assert(topic.equals(subscription));
          System.out.println(data);

          synchronized(this) {
             try {
                this.wait(60000);  // wait 1 minute
             }
             catch(InterruptedException ie){}
          }
       }

       subscriber.close();
       context.close();
       context.destroy();

       System.out.println("My Service Finished "
                           + new java.util.Date());
    }

    /**
     * Stop this service instance
     */
    public void stop() {
       stopped = true;
       synchronized(this) {
          this.notify();
       }
    }
 }

Then I created the following folder structure just like the tutorial suggested:

E:\SubscriberACD
   \bin
        \subscriberACD.exe
        \subscriberACDw.exe
   \classes
        \com\org\SubscriberACD\Subscriber.class
   \logs

I then navigated to the bin directory and issued the following command to install the service:

subscriberACD.exe //IS//SubscriberACD --Install=E:\SubscriberACD\bin\subscriberACD.exe --Descriptio
n="Subscriber using Apache Commons Daemon" --Jvm=c:\glassfish4\jdk7\jre
\bin\server\jvm.dll --Classpath=E:\SubscriberACD\classes --StartMode=jvm
 --StartClass=com.org.SubscriberACD.Subscriber --StartMethod=windowsSer
vice --StartParams=start --StopMode=jvm --StopClass=com.org.SubscriberA
CD.Subscriber --StopMethod=windowsService --StopParams=stop --LogPath=E:\SubscriberACD\logs --StdOutput=auto --StdError=auto

The install works fine since I can see it in Windows Services. However, when I try to start it from there, I get an error saying "Windows cannot start the SubscriberACD on Local Computer".

I checked the error logs and see the following entry:

2016-04-14 14:38:40 Commons Daemon procrun stderr initialized
Exception in thread "main" ror: org/zeromq/ZContext
    at com.org.SubscriberACD.Subscriber.start(Subscriber.java:57)
    at com.org.SubscriberACD.Subscriber.windowsService(Subscriber.java:33)
Caused by: java.lang.ClassNotFoundException: org.zeromq.ZContext
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
    ... 2 more

It's worth noting that JeroMQ is currently a jar under my Maven Dependencies. I configured it from my POM.xml file.

I think the problem might be that my service doesn't have access to the JeroMQ jar that is under my Maven Dependencies. My assumption is that the class file doesn't contain the dependencies. So what I tried was exporting my entire project as a jar and stuck that baby under E:\SubscriberACD\classes\ So my structure now looks like this:

E:\SubscriberACD
   \bin
        \subscriberACD.exe
        \subscriberACDw.exe
   \classes
        \com\org\SubscriberACD\
             \Subscriber.class
        \Subscriber.jar
   \logs

However, that didn't fix the issue. Can anyone shed some light on this?


Answer:

Change your --Classpath argument to :

--Classpath=E:\SubscriberACD\classes\your-jar-filename.jar 

You almost certainly have other jarfiles you'll need, so just append them to the end of the --Classpath using ; (semi-colon) delimiters...

--Classpath=E:\SubscriberACD\classes\your-jar-filename.jar;e:\other-dir\classes\some-other.jar;etc...

Question:

I'm trying to setup zeromq data stream to spark. Basically I took the ZeroMQWordCount.scala app an tried to recompile it and run it.

I have zeromq 2.1 installed, and spark 1.2.1 here is my scala code:

package org.apache.spark.examples.streaming

import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
import akka.zeromq._
import akka.zeromq.Subscribe
import akka.util.ByteString

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.zeromq._

import scala.language.implicitConversions
import org.apache.spark.SparkConf

object ZmqBenchmark {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: ZmqBenchmark <zeroMQurl> <topic>")
      System.exit(1)
    }
    //StreamingExamples.setStreamingLogLevels()
    val Seq(url, topic) = args.toSeq
    val sparkConf = new SparkConf().setAppName("ZmqBenchmark")
    // Create the context and set the batch size
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator

    // For this stream, a zeroMQ publisher should be running.
    val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

and this is my .sbt file for dependencies:

name := "ZmqBenchmark"

version := "1.0"

scalaVersion := "2.10.4"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

resolvers += "Sonatype (releases)" at "https://oss.sonatype.org/content/repositories/releases/"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.2.1"

libraryDependencies += "org.apache.spark"  %% "spark-streaming" % "1.2.1"

libraryDependencies += "org.apache.spark" % "spark-streaming-zeromq_2.10" % "1.2.1"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.2.0"

libraryDependencies += "org.zeromq" %% "zeromq-scala-binding" % "0.0.6"

libraryDependencies += "com.typesafe.akka" % "akka-zeromq_2.10.0-RC5" % "2.1.0-RC6"

libraryDependencies += "org.apache.spark" % "spark-examples_2.10" % "1.1.1"

libraryDependencies += "org.spark-project.zeromq" % "zeromq-scala-binding_2.11" % "0.0.7-spark"

The application compiles without any errors using sbt package, however when i run the application with spark submit, i get an error:

zaid@zaid-VirtualBox:~/spark-1.2.1$ ./bin/spark-submit --master local[*] ./zeromqsub/example/target/scala-2.10/zmqbenchmark_2.10-1.0.jar tcp://127.0.0.1:5553 hello
15/03/06 10:21:11 WARN Utils: Your hostname, zaid-VirtualBox resolves to a loopback address: 127.0.1.1; using 192.168.220.175 instead (on interface eth0)
15/03/06 10:21:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/zeromq/ZeroMQUtils$
    at ZmqBenchmark$.main(ZmqBenchmark.scala:78)
    at ZmqBenchmark.main(ZmqBenchmark.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.zeromq.ZeroMQUtils$
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 9 more

Any ideas why this happens? i know the app should work because when i run the same example using the $/run-example $ script and point to the ZeroMQWordCount app from spark, it runs without the exception. My guess is the sbt file is incorrect, what else do I need to have in the sbt file?

Thanks


Answer:

You are using ZeroMQUtils.createStream but the line

Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.zeromq.ZeroMQUtils

shows that the bytecode for ZeroMQUtils was not located. When the spark examples are run, they are run against a jar file (like spark-1.2.1/examples/target/scala-2.10/spark-examples-1.2.1-hadoop1.0.4.jar) including the ZeroMQUtils class. A solution would be to use the --jars flag so spark-submit command can find the bytecode. In your case, this could be something like

spark-submit --jars /opt/spark/spark-1.2.1/examples/target/scala-2.10/spark-examples-1.2.1-hadoop1.0.4.jar--master local[*] ./zeromqsub/example/target/scala-2.10/zmqbenchmark_2.10-1.0.jar tcp://127.0.0.1:5553 hello

assuming that you have installed spark-1.2.1 in /opt/spark.

Question:

As the spark docs says,it support kafka as data streaming source.but I use ZeroMQ,And there is not a ZeroMQUtils.so how can I use it? and generally,how about other MQs. I am totally new to spark and spark streaming, so I am sorry if the question is stupid.Could anyone give me a solution.Thanks BTW,I use python.

Update, I finally did it in java with a Custom Receiver. Below is my solution

public class ZeroMQReceiver extends Receiver<T> {

    private static final ObjectMapper mapper = new ObjectMapper();

    public ZeroMQReceiver() {

        super(StorageLevel.MEMORY_AND_DISK_2());
    }

    @Override
    public void onStart() {
        // Start the thread that receives data over a connection
        new Thread(this::receive).start();
    }

    @Override
    public void onStop() {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself if isStopped() returns false
    }

    /** Create a socket connection and receive data until receiver is stopped */
    private void receive() {
        String message = null;

        try {

            ZMQ.Context context = ZMQ.context(1); 
            ZMQ.Socket subscriber = context.socket(ZMQ.SUB);     
            subscriber.connect("tcp://ip:port");    
            subscriber.subscribe("".getBytes());  

            // Until stopped or connection broken continue reading
            while (!isStopped() && (message = subscriber.recvStr()) != null) {
                List<T> results = mapper.readValue(message,
                        new TypeReference<List<T>>(){} );
                for (T item : results) {
                    store(item);
                }
            }
            // Restart in an attempt to connect again when server is active again
            restart("Trying to connect again");
        } catch(Throwable t) {
            // restart if there is any other error
            restart("Error receiving data", t);
        }
    }
}

Answer:

I assume you are talking about Structured Streaming.

I am not familiar with ZeroMQ, but an important point in Spark Structured Streaming sources is replayability (in order to ensure fault tolerance), which, if I understand correctly, ZeroMQ doesn't deliver out-of-the-box.

A practical approach would be buffering the data either in Kafka and using the KafkaSource or as files in a (local FS/NFS, HDFS, S3) directory and using the FileSource for reading. Cf. Spark Docs. If you use the FileSource, make sure not to append anything to an existing file in the FileSource's input directory, but move them into the directory atomically.

Question:

I'm trying to run Spark application that uses ZeroMQ on a YARN cluster.

Executor's log contains following messages:

16/05/24 11:42:09 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 67.2 KB, free 530.2 MB)
16/05/24 11:42:09 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1464090129200
16/05/24 11:42:09 INFO receiver.BlockGenerator: Started BlockGenerator
16/05/24 11:42:09 INFO receiver.BlockGenerator: Started block pushing thread
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Starting receiver
16/05/24 11:42:09 INFO receiver.ActorReceiver: Supervision tree for receivers initialized at:akka://sparkExecutor/user/Supervisor0
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
16/05/24 11:42:09 INFO receiver.ActorReceiver: Started receiver worker at:akka://sparkExecutor/user/Supervisor0/ZeroMQReceiver
16/05/24 11:42:09 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped
16/05/24 11:42:09 ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
    at akka.actor.ActorCell.create(ActorCell.scala:596)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.zeromq'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
    at com.typesafe.config.impl.SimpleConfig.getDuration(SimpleConfig.java:260)
    at com.typesafe.config.impl.SimpleConfig.getMilliseconds(SimpleConfig.java:249)
    at akka.zeromq.ZeroMQExtension.<init>(ZeroMQExtension.scala:48)
    at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:35)
    at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:32)
    at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
    at akka.actor.ExtensionId$class.apply(Extension.scala:79)
    at akka.zeromq.ZeroMQExtension$.apply(ZeroMQExtension.scala:32)
    at org.apache.spark.streaming.zeromq.ZeroMQReceiver.preStart(ZeroMQReceiver.scala:39)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
    at org.apache.spark.streaming.zeromq.ZeroMQReceiver.aroundPreStart(ZeroMQReceiver.scala:32)
    at akka.actor.ActorCell.create(ActorCell.scala:580)
    ... 9 more
16/05/24 11:42:09 ERROR actor.ActorCell: changing Recreate into Create after akka.actor.ActorInitializationException: exception during creation
16/05/24 11:42:09 ERROR actor.OneForOneStrategy: No configuration setting found for key 'akka.zeromq'
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
    at akka.actor.ActorCell.create(ActorCell.scala:596)
    at akka.actor.dungeon.FaultHandling$class.finishCreate(FaultHandling.scala:136)
    at akka.actor.dungeon.FaultHandling$class.faultCreate(FaultHandling.scala:130)
    at akka.actor.ActorCell.faultCreate(ActorCell.scala:369)
    at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:59)
    at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.zeromq'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:147)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
    at com.typesafe.config.impl.SimpleConfig.getDuration(SimpleConfig.java:260)
    at com.typesafe.config.impl.SimpleConfig.getMilliseconds(SimpleConfig.java:249)
    at akka.zeromq.ZeroMQExtension.<init>(ZeroMQExtension.scala:48)
    at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:35)
    at akka.zeromq.ZeroMQExtension$.createExtension(ZeroMQExtension.scala:32)
    at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:713)
    at akka.actor.ExtensionId$class.apply(Extension.scala:79)
    at akka.zeromq.ZeroMQExtension$.apply(ZeroMQExtension.scala:32)
    at org.apache.spark.streaming.zeromq.ZeroMQReceiver.preStart(ZeroMQReceiver.scala:39)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
    at org.apache.spark.streaming.zeromq.ZeroMQReceiver.aroundPreStart(ZeroMQReceiver.scala:32)
    at akka.actor.ActorCell.create(ActorCell.scala:580)
    ... 14 more

My application.conf file contains the akka.zeromq section, but executors do not seem to see these parameters ( their respective configuration settings ).

The Driver app has an access to the application.conf file.

This problem reproduced on a 'Words Count' example app.

I've tried to use following commands to run this app:

spark-submit 
--verbose 
--class app.ZeroMQWordCount 
--master yarn-cluster 
app-allinone.jar "tcp://127.0.1.1:1234" "foo"

spark-submit 
--class app.ZeroMQWordCount  
--master yarn-cluster 
--files hdfs://namenode:8020/app/application.conf 
--conf "spark.executor.extraClassPath=application.conf" 
app-allinone.jar "tcp://127.0.1.1:1234" "foo"

spark-submit 
--class app.ZeroMQWordCount  
--master yarn-cluster 
--files hdfs://namenode:8020/app/application.conf 
--conf "spark.executor.extraClassPath=./" 
app-allinone.jar "tcp://127.0.1.1:1234" "foo"

Answer:

I see two ways for solving my problem:

1) Set options via the "SparkConf" object (e.g. sparkConf.set("akka.zeromq.new-socket-timeout", "5"))

2)Set options via cli args (e.g. --conf "spark.executor.extraJavaOptions=-Dakka.zeromq.poll-timeout=100ms -Dakka.zeromq.new-socket-timeout=5s")

Also you should init all akka.zeromq.* options:

sparkConf.set("akka.zeromq.socket-dispatcher.executor", "thread-pool-executor")

sparkConf.set("akka.zeromq.new-socket-timeout", "5")

sparkConf.set("akka.zeromq.poll-timeout", "100")

sparkConf.set("akka.zeromq.socket-dispatcher.thread-pool-executor.allow-core-timeout", "off")

sparkConf.set("akka.zeromq.socket-dispatcher.type", "PinnedDispatcher")