Serialization issues in Spark Streaming

org apache$spark sparkexception task not serializable
spark streaming example
advantages of serialization in spark
dstream checkpointing has been enabled but the dstreams with their functions are not serializable
foreachrdd
pyspark streaming example
spark broadcast serialization
java io notserializableexception graph is unexpectedly null when dstream is being serialized

I'm quite confused about how Spark works with the data under the hood. For example, when I run a streaming job and apply foreachRDD, the behaviour depends on whether a variable is captured from the outer scope or initialised inside.

val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

In this case, I get an exception:

java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.

But if I move sparkConf inside, everything seems to be fine:

dStream.foreachRDD(rdd => {
    val sparkConf = rdd.sparkContext.getConf
    val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

This looks quite odd to me because I thought that foreachRDD runs on the driver node, so I didn't expect any difference.

Now, if I move both the SQL session and the config outside foreachRDD, it works fine again:

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})

A snippet in Spark documentation suggests the previous version (where both config and SQL context are created inside foreachRDD), which seems less efficient to me: why create them for every batch if they could be created just once?

Could someone explain why the exception is thrown and what is the proper way to create the SQL context?

ForeachRDD run, as the name suggest, foreach rdd in the streaming why you should recreate the spark context at each rdd? The correct approach is the last one :

val sparkConf = new SparkConf()
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
dStream.foreachRDD(rdd => {
    val df = spark.read.json(rdd)
    ...
})

Best practices for developing streaming applications, Spark uses SerializationDebugger as default debugger to detect the serialization issues, but sometimes it may run into SerializationDebugger:  One approach to serialization issues can be to make everything Serializable. However in this case you will find it doesn’t solve the issue. You’ll find it easier (but not that easy..!) to spot

val spark = SparkSession.builder.config(sparkConf).getOrCreate() does not create another SparkSession. Only one exists. On worker, just get it from job.

Serialization issues - part 1 on waitingforcode.com, SparkException: Task not serializable is produced when object used in checkpoint - in Spark Streaming the checkpoint is stored in serialized  The Spark cluster is running Spark 2.4.3 (the version compiled with Scala 2.12). And the machine on which both the Spark cluster and the app are running is using openJDK 1.8.0_212. According to another internet search, the problem could have been because of a mismatch in the spark.master URL.

In first approach, you are trying to instantiate spark session object for each partition which is not correct.

As answered by others, use 3rd approach. But if you need to use first approach then you can use as below -

val sparkConf = new SparkConf()
dStream.foreachRDD(rdd => {
    lazy val spark = SparkSession.builder.config(sparkConf).getOrCreate()
    ...
})

Here Lazy evaluation will help not to instantiate spark session multiple times which will avoid serialization issues.

I hope this is helpful.

Serialization challenges with Spark and Scala, Before we get into examples let's explore the basic rules around serialization with respect to Spark code. When will objects need to be Serialized  Solve Serialization Issue In spark development, it is very common to come across serialization errors, especially in Spark Streaming applications. There are typically two ways to handle this: 1, make the object/class serializable; 2, declare the instance within the lambda function.

Serialization issues in Spark Streaming, Serialization issues in Spark Streaming. org apache$spark sparkexception task not serializable spark streaming example kryo serialization spark foreachrdd Exception stack trace: java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.PluggableInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure.

Tuning - Spark 3.0.0 Documentation, This guide will cover two main topics: data serialization, which is crucial for good network performance and can also reduce memory use, and memory tuning. Apache Spark is a great tool for high performance, high volume data analytics. When working with Spark and Scala you will often find that your objects will need to be serialized so they can be sent…

Job aborted due to stage failure: Task not serializable:, In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code  However, this development process can become tedious, more so for Spark Streaming applications. The following section describes tips on how to overcome one of the most common issues you hit when developing Spark Streaming applications: NotSerializableException. When developing Spark applications, it is common to hit a stack trace like the following:

Comments
  • I thought that foreachRDD runs on the driver node The method passed to foreachRDD runs on the workers, not the driver.
  • @YuvalItzchakov I don't think so, because foreachRDD operates on the whole RDD, not on partitions or elements of that RDD. And the documentation explicitly says that it runs on the driver node: spark.apache.org/docs/latest/…
  • You're right, I didn't phrase myself correctly. I meant to say that delegates passed to the rdd, (i.e., any operation you'd want to do on the dataframe) will run on the workers nodes.
  • @lizarisk : you cannot use any un-serialized classes inside transformations or actions operations, which is running on worker nodes.
  • @Shankar I don't see anything that runs on a worker node here