Applying UDFs on GroupedData in PySpark (with functioning python example)

pyspark pandas udf
pyspark udf
pyspark apply function to grouped data
topandas pyspark
pandas udf multiple arguments
pandasudftype
pyspark apply function to each row
pyspark udf performance

I have this python code that runs locally in a pandas dataframe:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))

I would like to run this in PySpark, but having trouble dealing with pyspark.sql.group.GroupedData object.

I've tried the following:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 

which returns

KeyError: 'A'

I presume because 'A' is no longer a column and I can't find the equivalent for x.name.

And then

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()

but get the following error:

AttributeError: 'GroupedData' object has no attribute 'map'

Any suggestions would be really appreciated!

What you are trying to is write a UDAF (User Defined Aggregate Function) as opposed to a UDF (User Defined Function). UDAFs are functions that work on data grouped by a key. Specifically they need to define how to merge multiple values in the group in a single partition, and then how to merge the results across partitions for key. There is currently no way in python to implement a UDAF, they can only be implemented in Scala.

But, you can work around it in Python. You can use collect set to gather your grouped values and then use a regular UDF to do what you want with them. The only caveat is collect_set only works on primitive values, so you will need to encode them down to a string.

from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf

def myFunc(data_list):
    for val in data_list:
        b, c = data.split(',')
        # do something

    return <whatever>

myUdf = udf(myFunc, StringType())

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
  .groupBy('A').agg(collect_list('data').alias('data'))
  .withColumn('data', myUdf('data'))

Use collect_set if you want deduping. Also, if you have lots of values for some of your keys, this will be slow because all values for a key will need to be collected in a single partition somewhere on your cluster. If your end result is a value you build by combining the values per key in some way (for example summing them) it might be faster to implement it using the RDD aggregateByKey method which lets you build an intermediate value for each key in a partition before shuffling data around.

EDIT: 11/21/2018

Since this answer was written, pyspark added support for UDAF'S using Pandas. There are some nice performance improvements when using the Panda's UDFs and UDAFs over straight python functions with RDDs. Under the hood it vectorizes the columns (batches the values from multiple rows together to optimize processing and compression). Take a look at here for a better explanation or look at user6910411's answer below for an example.

Python Aggregate UDFs in PySpark, UDAF functions works on a data that is grouped by a key, where they need to define how to merge multiple values in the group in a single  Applying UDFs on GroupedData in PySpark (with functioning python example) I have this python code that runs locally in a pandas dataframe: I would like to run this in PySpark, but having trouble dealing with pyspark.sql.group.GroupedData object.

Since Spark 2.3 you can use pandas_udf. GROUPED_MAP takes Callable[[pandas.DataFrame], pandas.DataFrame] or in other words a function which maps from Pandas DataFrame of the same shape as the input, to the output DataFrame.

For example if data looks like this:

df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
    ("key", "value1", "value2")
)

and you want to compute average value of pairwise min between value1 value2, you have to define output schema:

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

pandas_udf:

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    result = pd.DataFrame(df.groupby(df.key).apply(
        lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
    ))
    result.reset_index(inplace=True, drop=False)
    return result

and apply it:

df.groupby("key").apply(g).show()
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+

Excluding schema definition and decorator, your current Pandas code can be applied as-is.

Since Spark 2.4.0 there is also GROUPED_AGG variant, which takes Callable[[pandas.Series, ...], T], where T is a primitive scalar:

import numpy as np

@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def f(x, y):
    return np.minimum(x, y).mean()

which can be used with standard group_by / agg construct:

df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+

Please note that neither GROUPED_MAP nor GROUPPED_AGG pandas_udf behave the same way as UserDefinedAggregateFunction or Aggregator, and it is closer to groupByKey or window functions with unbounded frame. Data is shuffled first, and only after that, UDF is applied.

For optimized execution you should implement Scala UserDefinedAggregateFunction and add Python wrapper.

See also User defined function to be applied to Window in PySpark?

PySpark Usage Guide for Pandas with Apache Arrow, What you are trying to is write a UDAF (User Defined Aggregate Function) as opposed to a UDF (User Defined Function). UDAFs are functions  I have this python code that runs locally in a pandas dataframe: df_result = pd.DataFrame(df .groupby('A') .apply(lambda x: myFunction(zip(x.B, x.C), x.name)) I would like to run this in PySpark, but having trouble dealing with pyspark.sql.group.GroupedData object. I&#8217;ve tried the following: sparkDF .groupby('A') .agg(myFunction(zip('B', 'C'), 'A')) which returns KeyError: 'A' I presume

I am going to extend above answer.

So you can implement same logic like pandas.groupby().apply in pyspark using @pandas_udf and which is vectorization method and faster then simple udf.

from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

df3.groupby("key").apply(g).show()

You will get below result:

+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
|  b|       6.5|      -1.5|    5.0|    8.0|
|  a|       0.0|      21.0|   21.0|  -21.0|
+---+----------+----------+-------+-------+

So , You can do more calculation between other fields in grouped data.and add them into dataframe in list format.

Introducing Pandas UDF for PySpark, PySpark currently has pandas_udfs, which can create custom aggregators, but you can only “apply” one pandas_udf at a time. If you want to  You have to register the function first. That is, save it to the database as if it were one of the built-in database functions, like sum(), average, count(),etc. That’s the case with Spark dataframes. With Spark RDDs you can run functions directly against the rows of an RDD. Three approaches to UDFs. There are three ways to create UDFs:

pandas user-defined functions, Find full example code at "examples/src/main/python/sql/arrow.py" in the Spark repo. DataFrame(x, columns=["x"])) # Execute function as a Spark vectorized UDF Grouped map Pandas UDFs are used with groupBy().apply() which not applied on groups and it is up to the user to ensure that the grouped data will fit into  Here’s a small gotcha — because Spark UDF doesn’t convert integers to floats, unlike Python function which works for both integers and floats, a Spark UDF will return a column of NULLs if the input data type doesn’t match the output data type, as in the following example.

Pyspark udf multiple arguments, How to run your native Python code with PySpark, fast. To define a scalar Pandas UDF, simply use @pandas_udf to annotate a Python function that takes in pandas. This example shows a more practical use of the scalar Pandas UDF​: Now run with Spark df.groupby('id').apply(substract_mean). You can find a working example Applying UDFs on GroupedData in PySpark (with functioning python example). While Pandas don't provide direct equivalent of window functions, there are expressive enough to implement any window-like logic, especially with pandas.DataFrame.rolling .

Apply a UDF over a Window function in PySpark [duplicate], A pandas user-defined function (UDF)—also known as vectorized UDF—is a import pandas as pd from pyspark.sql.functions import col, The following example shows how to create scalar iterator pandas UDFs. Python not applied on groups and it is up to you to ensure that the grouped data will fit into  A natural approach could be to group the words into one list, and then use the python function Counter() to generate word counts. For both steps we'll use udf's. First, the one that will flatten the nested list resulting from collect_list() of multiple arrays: unpack_udf = udf( lambda l: [item for sublist in l for item in sublist] )

Comments
  • How would you simulate panda_udf in Spark<=2.2 (due to company's infra). I would like to parallel process columns, and in each column make use of Spark to parallel process rows. Let's say I have certain stat that I want to apply per column. collect_list gives list, is it efficient to convert to new Spark DF ? def myFunc(d_list): sdf = spark.createDataframe(d_list); return sdf.map(...).reduceByKey(...); and called on two columns df.agg(collect_list('col1').alias('col1'),collect_list('col2').alias('col2') ) .withColumn('col1_', myUdf('col1')).withColumn('col2_', myUdf('col2'))
  • Hi @user6910411 : Do you have any advice for implementing vectorized column aggregate with Spark <= 2.2 ? stackoverflow.com/questions/55314978/…