Manipulating a dataframe within a Spark UDF

Related searches

I have a UDF that filters and selects values from a dataframe, but it runs into "object not serializable" error. Details below.

Suppose I have a dataframe df1 that has columns with names ("ID", "Y1", "Y2", "Y3", "Y4", "Y5", "Y6", "Y7", "Y8", "Y9", "Y10"). I want sum a subset of the "Y" columns based on the matching "ID" and "Value" from another dataframe df2. I tried the following:

val y_list = ("Y1", "Y2", "Y3", "Y4", "Y5", "Y6", "Y7", "Y8", "Y9", "Y10").map(c => col(c))

def udf_test(ID: String, value: Int): Double = {
  df1.filter($"ID" === ID).select(y_list:_*).first.toSeq.toList.take(value).foldLeft(0.0)(_+_)
}
sqlContext.udf.register("udf_test", udf_test _)

val df_result = df2.withColumn("Result", callUDF("udf_test", $"ID", $"Value"))

This gives me errors of the form:

java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: Y1)

I looked this up and realized that Spark Column is not serializable. I am wondering:

1) There is any way to manipulate a dataframe within an UDF?

2) If not, what's the best way to achieve the type of operation above? My real case is more complicated than this. It requires me to select values from multiple small dataframes based on some columns in a big dataframe, and compute back a value to the big dataframe.

I am using Spark 1.6.3. Thanks!


You can't use Dataset operations inside UDFs. UDF can only manupulate on existing columns and produce one result column. It can't filter Dataset or make aggregations, but it can be used inside filter. UDAF also can aggregate values.

Instead, you can use .as[SomeCaseClass] to make Dataset from DataFrame and use normal, strongly typed functions inside filter, map, reduce.

Edit: If you want to join your bigDF with every small DF in smallDFs List, you can do:

import org.apache.spark.sql.functions._
val bigDF = // some processing
val smallDFs = Seq(someSmallDF1, someSmallDF2)
val joined = smallDFs.foldLeft(bigDF)((acc, df) => acc.join(broadcast(df), "join_column"))

broadcast is a function to add Broadcast Hint to small DF, so that small DF will use more efficient Broadcast Join instead of Sort Merge Join

User-Defined Functions (UDFs) � The Internals of Spark SQL, I looked this up and realized that Spark Column is not serializable. I am wondering: 1) There is any way to manipulate a dataframe within an UDF? 2) If not� Extending Apache Spark with user-defined functions (UDFs). · Registering a UDF. · Calling a UDF with the dataframe API and Spark SQL. · Using UDFs for data quality within Spark. · Understanding the constraints linked to UDFs.


1) No, you can only use plain scala code within UDFs

2) If you interpreted your code correctly, you can achieve your goal with:

df2
  .join(
    df1.select($"ID",y_list.foldLeft(lit(0))(_ + _).as("Result")),Seq("ID")
  )

How to read variable inside Spark UDF outside of that function , UDFs play a vital role in Spark MLlib to define new Transformers that are function objects that transform DataFrames into DataFrames by introducing new� Spark DataFrame UDFs: Examples using Scala and Python Last updated: 11 Nov 2015. WIP Alert This is a work in progress. Current information is correct but more content will probably be added in the future. What are User-Defined functions ? They are function that operate on a DataFrame's column.


import org.apache.spark.sql.functions._
val events = Seq (
(1,1,2,3,4),
(2,1,2,3,4),
(3,1,2,3,4),
(4,1,2,3,4),
(5,1,2,3,4)).toDF("ID","amt1","amt2","amt3","amt4")

var prev_amt5=0
var i=1
def getamt5value(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) : Int = {  
  if(i==1){
    i=i+1
    prev_amt5=0
  }else{
    i=i+1
  }
  if (ID == 0)
  {
    if(amt1==0)
    {
      val cur_amt5= 1
      prev_amt5=cur_amt5
      cur_amt5
    }else{
      val cur_amt5=1*(amt2+amt3)
      prev_amt5=cur_amt5
      cur_amt5
    }
  }else if (amt4==0 || (prev_amt5==0 & amt1==0)){
    val cur_amt5=0
    prev_amt5=cur_amt5
    cur_amt5
  }else{
    val cur_amt5=prev_amt5 +  amt2 + amt3 + amt4
    prev_amt5=cur_amt5
    cur_amt5
  }
}

val getamt5 = udf {(ID:Int,amt1:Int,amt2:Int,amt3:Int,amt4:Int) =>            
   getamt5value(ID,amt1,amt2,amt3,amt4)    
}
myDF.withColumn("amnt5", getamt5(myDF.col("ID"),myDF.col("amt1"),myDF.col("amt2"),myDF.col("amt3"),myDF.col("amt4"))).show()

Using user defined functions in Spark, Manipulating a dataframe within a Spark UDF. I have a UDF that filters and selects values from a dataframe, but it runs into "object not serializable" error. Details� Here’s a small gotcha — because Spark UDF doesn’t convert integers to floats, unlike Python function which works for both integers and floats, a Spark UDF will return a column of NULLs if the input data type doesn’t match the output data type, as in the following example. Registering UDF with integer type output


14. Extending transformations with user-defined functions (UDFs , Here is an example of Using user defined functions in Spark: You've seen some of Spark's built-in string functions when it comes to manipulating DataFrames. The pyspark.sql.functions library is available under the alias F . The classes� Data scientists spend more time wrangling data than making models. Traditional tools like Pandas provide a very powerful data manipulation toolset. Transitioning to big data tools like PySpark allows one to work with much larger datasets, but can come at the cost of productivity.


Do I have to solely use the dataframe API and Spark SQL to implement all the transformations I Now, in this section, you will use SQL to manipulate a UDF. I User Defined Function (UDF) A. why to avoid spark UDF why spark udf are bad an example to show disadvantages of spark udf Please subscribe to our channel. pandas user-defined functions. In the simplest terms, a user-defined function (UDF) in SQL Server is a programming construct that accepts parameters, does work that typically makes use of


The PySpark DataFrame object is an interface to Spark’s DataFrame API and a Spark DataFrame within a Spark application. The data in the DataFrame is very likely to be somewhere else than the computer running the Python interpreter – e.g. on a remote Spark cluster running in the cloud.