How to add multiple columns using UDF?

spark udf return multiple columns java
pyspark udf multiple columns
pyspark add multiple columns
pyspark udf return struct
spark create multiple columns
pandas udf multiple arguments
spark udf called multiple times
pyspark udf on array column

Question

I want to add the return values of a UDF to an existing dataframe in seperate columns. How do I achieve this in a resourceful way?

Here's an example of what I have so far.

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType  

df = spark.createDataFrame([("Alive",4)],["Name","Number"])
df.show(1)

+-----+------+
| Name|Number|
+-----+------+
|Alive|     4|
+-----+------+

def example(n):
        return [[n+2], [n-2]]

#  schema = StructType([
#          StructField("Out1", ArrayType(IntegerType()), False),
#          StructField("Out2", ArrayType(IntegerType()), False)])

example_udf = udf(example)

Now I can add a column to the dataframe as follows

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF.show(1)
+-----+------+----------+
| Name|Number|Output    |
+-----+------+----------+
|Alive|     4|[[6], [2]]|
+-----+------+----------+

However I don't want the two values to be in the same column but rather in separate ones.

Ideally I'd like to split the output column now to avoid calling the example function two times (once for each return value) as explained here and here, however in my situation I'm getting an array of arrays and I can't see how a split would work there (please note that each array will contain multiple values, separated with a ",".

How the result should look like

What I ultimately want is this

+-----+------+----+----+
| Name|Number|Out1|Out2|
+-----+------+----+----+
|Alive|     4|   6|   2|
+-----+------+----+----+

Note that the use of the StructType return type is optional and doesn't necessarily have to be part of the solution.

EDIT: I commented out the use of StructType (and edited the udf assignment) since it's not necessary for the return type of the example function. However it has to be used if the return value would be something like

return [6,3,2],[4,3,1]

To return a StructType, just using Row

df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])


def example(n):
    return Row('Out1', 'Out2')(n + 2, n - 2)


schema = StructType([
    StructField("Out1", IntegerType(), False),
    StructField("Out2", IntegerType(), False)])

example_udf = f.UserDefinedFunction(example, schema)

newDF = df.withColumn("Output", example_udf(df["Number"]))
newDF = newDF.select("Name", "Number", "Output.*")

newDF.show(truncate=False)

How to use UDF to return multiple columns?, Is it possible to create multiple columns with single UDF or do I need to follow the rule: "single column per single UDF"? share. Stack Overflow Public It is not possible to create multiple top level columns from a single UDF call but you can How to add multiple columns using UDF? 2.

Better way to solve above problem is by casting the output in an array and then exploding it

import pyspark.sql.functions as f
import pyspark.sql.types as t

df = spark.createDataFrame([("Alive", 4)], ["Name", "Number"])


def example(n):
    return t.Row('Out1', 'Out2')(n + 2, n - 2)


schema = StructType([
    StructField("Out1", t.IntegerType(), False),
    StructField("Out2", t.IntegerType(), False)])

example_udf = f.udf(example, schema)

newDF = df.withColumn("Output", f.explode(f.array(example_udf(df["Number"]))))
newDF = newDF.select("Name", "Number", "Output.*")

newDF.show(truncate=False)
newDF.explain()

Notice the output of explain, you will observe that example method is actually getting called only once!!

Apache Spark, Creating multiple top level columns from a single UDF call, isn't possible but you can create a new struct. For that you will require an UDF with  Now the dataframe can sometimes have 3 columns or 4 columns or more. It will vary. I know I can hard code 4 column names as pass in the UDF but in this case it will vary so I would like to know how to get it done? Here are two examples in the first one we have two columns to add and in the second one we have three columns to add.

In scala

import spark.implicits
val df = Seq(("Alive", 4)).toDF("Name", "Number")

Without a UDF

df.
  withColumn("OutPlus",  $"Number" + 2).
  withColumn("OutMinus", $"Number" - 2).
  show
+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive|     4|      6|       2|
+-----+------+-------+--------+

With a UDF using explode

import org.apache.spark.sql.functions.udf
def twoItems(_i: Int) = Seq((_i + 2, _i - 2))
val twoItemsUdf = udf(twoItems(_: Int))

val exploded = df.
  withColumn("Out", explode(twoItemsUdf($"Number"))).
  withColumn("OutPlus", $"Out._1").
  withColumn("OutMinus", $"Out._2")

exploded.printSchema

root
 |-- Name: string (nullable = true)
 |-- Number: integer (nullable = false)
 |-- Out: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: integer (nullable = false)
 |-- OutPlus: integer (nullable = true)
 |-- OutMinus: integer (nullable = true)

  exploded.drop("Out").show

+-----+------+-------+--------+
| Name|Number|OutPlus|OutMinus|
+-----+------+-------+--------+
|Alive|     4|      6|       2|
+-----+------+-------+--------+

Derive multiple columns from a single column in a Spark DataFrame , UDF can return only a single column at the time. There are two different ways you can overcome this limitation: Return a column of complex type. The most  Now add the new column using the withColumn() call of DataFrame. The first parameter “sum” is the name of the new column, the second parameter is the call to the UDF “addColumnUDF”. To the udf “addColumnUDF” we pass 2 columns of the DataFrame “inputDataFrame”.

Pyspark: Pass multiple columns in UDF - apache-spark - html, Pyspark: Pass multiple columns in UDF - apache-spark. Here are two examples in the first one we have two columns to add and in the second one we have  pyspark: passing multiple dataframe fields to udf. add a comment | Your Answer Pass multiple columns in UDF. 0.

5 Ways to add a new column in a PySpark Dataframe, How to create a new column in PySpark Dataframe? Sometimes we want to do complicated things to a column or multiple columns. To use Spark UDFs, we need to use the F.udf function to convert a regular python  1 Answer 1. AFAIk you need to call withColumn twice (once for each new column). But if your udf is computationally expensive, you can avoid to call it twice with storing the "complex" result in a temporary column and then "unpacking" the result e.g. using the apply method of column (which gives access to the array element).

UNPIVOT multiple columns into tidy pairs with BigQuery and a SQL , Find in this post, a shared persistent BigQuery UDF to transform these hundreds of columns into tidy (date, value) pairs you can put to use. Felipe Hoffa · Follow. So I monkey patched spark dataframe to make it easy to add multiple columns to spark dataframe. First lets create a udf_wrapper decorator to keep the code concise from pyspark.sql.functions import udf def udf_wrapper ( returntype ): def udf_func ( func ): return udf ( func , returnType = returntype ) return udf_func

Comments
  • Possible duplicate of Apache Spark -- Assign the result of UDF to multiple dataframe columns
  • Oh sweet, that's exactly what I've been looking for! Thank you.
  • This technique works but if you look at the explain plan it internally calls the example method twice, which is not intended and should be avoided. As in general catalyst optimizer does not optimize UDF since its a blackbox on top of that we ended up calling same UDF twice(in this case, since we want out1 and out2 in different columns)
  • Purely anecdotal, but I tried both versions in my script and this was almost 3 times faster! Anyone looking at this should definitely try it