How to write unit tests in Spark 2.0+?

I've been trying to find a reasonable way to test SparkSession with the JUnit testing framework. While there seem to be good examples for SparkContext, I couldn't figure out how to get a corresponding example working for SparkSession, even though it is used in several places internally in spark-testing-base. I'd be happy to try a solution that doesn't use spark-testing-base as well if it isn't really the right way to go here.

Simple test case (complete MWE project with build.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

The result of running this with JUnit is an NPE at the load line:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Note it shouldn't matter that the file being loaded exists or not; in a properly configured SparkSession, a more sensible error will be thrown.

How to write unit tests in Spark 2.0+?, How to write unit tests in Spark 2.0+? import com.holdenkarau.spark.testing.DataFrameSuiteBase. import org.junit.Test. import org.apache.spark.sql.SparkSession. class SessionTest extends FunSuite with DataFrameSuiteBase { implicit val sparkImpl: SparkSession = spark. @Test. val homeDir = System.getProperty("user.home") There is no native library for unit testing in Spark as of yet. After researching this topic for a while, I feel that the best option is to use two libraries: ScalaTest; Spark-Testing-Base; A little bit about ScalaTest. For Scala users, this is the most familiar unit testing framework (you can also use it for testing Java code and soon for JavaScript).

You can write a simple test with FunSuite and BeforeAndAfterEach like below

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

You don't need to create a functions in test you can simply write as

test ("test name") {//implementation and assert}

Holden Karau has written really nice test spark-testing-base

You need to check out below is a simple example

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

Hope this helps!

bosea/spark-unit-testing: A tutorial on Apache Spark Unit , Contribute to bosea/spark-unit-testing development by creating an account on GitHub. Write the actual test test("L2 distance between 2 Vector's")  Traveling to different companies and building out a number of Spark solutions, I have found that there is a lack of knowledge around how to unit test Spark applications. In this talk we will address that by walking through examples for unit testing, Spark Core, Spark MlLib, Spark GraphX, Spark SQL, and Spark Streaming.

I like to create a SparkSessionTestWrapper trait that can be mixed in to test classes. Shankar's approach works, but it's prohibitively slow for test suites with multiple files.

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

The trait can be used as follows:

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

Check the spark-spec project for a real-life example that uses the SparkSessionTestWrapper approach.

Update

The spark-testing-base library automatically adds the SparkSession when certain traits are mixed in to the test class (e.g. when DataFrameSuiteBase is mixed in, you'll have access to the SparkSession via the spark variable).

I created a separate testing library called spark-fast-tests to give the users full control of the SparkSession when running their tests. I don't think a test helper library should set the SparkSession. Users should be able to start and stop their SparkSession as they see fit (I like to create one SparkSession and use it throughout the test suite run).

Here's an example of the spark-fast-tests assertSmallDatasetEquality method in action:

import com.github.mrpowers.spark.fast.tests.DatasetComparer

class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

    it("aliases a DataFrame") {

      val sourceDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("name")

      val actualDF = sourceDF.select(col("name").alias("student"))

      val expectedDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("student")

      assertSmallDatasetEquality(actualDF, expectedDF)

    }

  }

}

Testing Spark Applications - Matthew Powers, Testing Spark applications allows for a rapid development workflow and The spark-fast-tests library is used to make DataFrame comparisons. n % 2 == 0 To follow along with this guide, first download a packaged release of Spark from the Spark website. Since we won’t be using HDFS, you can download a package for any version of Hadoop. Note that, before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD).

Since Spark 1.6 you could use SharedSparkContext or SharedSQLContext that Spark uses for its own unit tests:

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

Since Spark 2.3 SharedSparkSession is available:

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

UPDATE:

Maven dependency:

<dependency>
  <groupId>org.scalactic</groupId>
  <artifactId>scalactic</artifactId>
  <version>SCALATEST_VERSION</version>
</dependency>
<dependency>
  <groupId>org.scalatest</groupId>
  <artifactId>scalatest</artifactId>
  <version>SCALATEST_VERSION</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

SBT dependency:

"org.scalactic" %% "scalactic" % SCALATEST_VERSION
"org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test"
"org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests"
"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

In addition, you could check test sources of Spark where there is a huge set of various test suits.

UPDATE 2:

Apache Spark Unit Testing Part 1 — Core Components

Apache Spark Unit Testing Part 2 — Spark SQL

Apache Spark Unit Testing Part 3 — Streaming

Unit Testing Apache Spark Applications using Hive Tables, Techniques for creating and managing unit tests of Spark batch applications ScalaTest is a powerful tool that can be used to unit test Scala and Java code. It is similar to <version>2.0.0</version> Make Medium yours. My current Java/Spark Unit Test approach works (detailed here) by instantiating a SparkContext using "local" and running unit tests using JUnit. The code has to be organized to do I/O in one function and then call another with multiple RDDs. This works great. I have a highly tested data transformation written in Java + Spark.

I could solve the problem with below code

spark-hive dependency is added in project pom

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }

Getting Started with Spark (part 4) - Unit Testing, In this post we cover an essential part of any ETL project, namely Unit testing. For that I created a sample repository, which is meant to serve as  .NET Core 2.0 and later supports .NET Standard 2.0, and we will use its libraries to demonstrate unit tests. You are able to use built-in .NET Core 2.0 and later unit test project templates for C#, F# and Visual Basic as a starting point for your personal project.

Unittesting Apache Spark Applications, Base classes to use when writing tests with Spark. You've written an cores * 2 is a general good rule, not only for the tests. Also added a  However, hard to read and brittle unit tests can wreak havoc on your code base. This article describes some best practices regarding unit test design for your .NET Core and .NET Standard projects. In this guide, you'll learn some best practices when writing unit tests to keep your tests resilient and easy to understand.

Spark and Spark Streaming Unit Testing, Spark claims, that it is friendly to unit testing with any popular unit test private val master = "local[2]" private val appName = "example-spark"  For unit testing, I found both Test Driven (tests first, code second) and code first, test second to be extremely useful. Instead of writing code, then writing test. Write code then look at what you THINK the code should be doing. Think about all the intended uses of it and then write a test for each.

Unit Testing with PySpark, Don't get me wrong, I don't particularly enjoy writing tests, but having a proper sparkContext.parallelize(['cat dog mouse','cat cat dog'], 2) Blank CSV values were incorrectly loaded into Spark 2.0.0 DataFrames as empty strings and this was fixed in Spark 2.0.1. This post describes the bug fix, explains the correct treatment per the CSV…

Comments
  • Thanks to all for the responses so far; I hope to review soon. I also opened up an issue and am cross referencing it here: github.com/holdenk/spark-testing-base/issues/180
  • Unfortunately, I still haven't gotten around to actually using Spark ... some day, maybe 3.x at this rate - otherwise I would work on accepting an answer. Glad this has been useful to others.
  • Thanks for this detailed writeup. Wish I could give you more than one upvote.
  • Thank you. That's very kind. I hope the answer helps you with your project or understanding.
  • Great answer. The spark-spec used a similar approach, but it was too slow when a lot of test files were added to the project. See my answer for an alternate implementation that doesn't force the SparkSession to be stopped / started after each test file.
  • I like the first part of this answer too; I just wish the second example had Spark stuff in it instead of a toy assertion. Beyond that though, I would point out that the notion of performing expensive side-effecting before and/or after a suite of tests is not a new idea. As I suggest in my answer, ScalaTest has ample mechanisms for that--in this case for managing Spark artifacts-- and you can use those as you would for any other expensive fixtures. At least until the time comes where bringing in a heavier third-party framework is worth it.
  • On a side note, ScalaTest and specs2 (which I think does so by default) can both run tests in parallel for speed gains. Build tools can also help. But again, none of this is new.
  • I have edited the appropriate test example for spark-testing-base as per your suggestion. Thanks,
  • In this approach how do you recommend adding sparkSession.stop() somewhere?
  • You shouldn't need to sparkSession.stop() @NeilBest. The Spark Session will be shut down when the test suite finishes running.
  • why need not to sparkSession.stop()? as @Shankar Koirala 's answer stop the sparkSession, is this useless?
  • @yuxh - Shankar's answer starts and stops the Spark session after every test. This approach works, but it's really slow because it takes a while to start a Spark session.