Aggregate df with user defined function

user defined aggregate function spark
user-defined aggregate function pyspark
pandas aggregate custom function multiple columns
pandas groupby aggregate custom function
spark aggregate functions example
user-defined aggregations in spark sql are type safe or untyped
pandas aggregate functions
pandas user defined function

I have a question regarding aggregating pandas dataframes with user defined functions. If i have a dataframe and run agg with or without groupby the result is aggregated when built in functions are used. If i on the other hand use a custom defined function it works as intended when groupby is used. When no groupby is used no aggregation is done. Is there a way to aggregate without a groupby and using a custom function? I know a could just add a dummy variable but that is not the prefered solution. Test1-3 work as intended but not test 4.

df = pd.DataFrame(columns=['a', 'b', 'c'])
n=1000
np.random.seed(0)

df['a'] = np.random.rand(n)
df['a'] = np.random.rand(n)
df['c'] = np.random.randint(1, 4, size=n)

def CoV(_s):
    return pd.Series({'CoV' : np.std(_s)/np.mean(_s)})

test1 = df.agg({'a':['std', np.mean]})
print(test1)

test2 = df.groupby(['c']).agg({'a':['std', np.mean]})
print(test2)

test3 = df.groupby(['c']).agg({'a':[CoV]})
print(test3)

# does not work as intended, no aggregation
test4 = df.agg({'a':[CoV]})
print(test4)

This will give you the desired result:

df.assign(k=1).groupby('k')['a'].apply(CoV).reset_index(drop=True)

So you assign k just to use it for groupby and then remove it by reseting and droping index.

User-defined aggregate functions - Scala, Learn how to implement a user-defined aggregate function in Scala and register val df = spark.sql("select id, id % 3 as group_id from ids") df. User Defined Functions (UDF) and User Defined Aggregate Functions (UDAF) Users can define a function and completely customize how SnappyData evaluates data and manipulates queries using UDF and UDAF functions across sessions. The definition of the functions is stored in a persistent catalog, which enables it to be used after node restart as well.

None of the answers here have addressed why this is failing. If you dig into the pandas code, when a UDF is passed to df.agg, a Series object for each column will be passed to the UDF.

In your case, using a dictionary selects a Series object (a column) and the UDF is then passed to the Series object's Series.agg function. Because it is not a known function (like the string 'mean'), it ends up being passed to Series.apply, which maps the function over each value in the Series object. This is the result you are seeing.

Luckily, the passing of the UDF to Series.apply happens in a try/except block. If it fail to to work using Series.apply(func), it swaps to passing the Series object to the function via func(Series). You can use this to modify your code to raise an error if the passed object is not a Series or DataFrame.

def CoV(_s):
    if not isinstance(_s, (pd.Series, pd.DataFrame, np.array)):
        raise TypeError()
    return pd.Series({'CoV' : np.std(_s)/np.mean(_s)})

Now passing it to .agg works as you would expect. It is a hacky work-around, but it works.

df.agg({'a': CoV})
# returns:
            a
CoV  0.584645

EDIT:

To get this to work with other functions, like 'mean', you will have to pass those as UDFs as well, unfortunately. Even worse, the accumulation of the results is different for UDFs than for built-in functions. Pandas simply stacks them horizontally with a hierarchical column index. A simple stack and reset_index fixes this.

def check_input(fn):
    def wrapper(_s, *args, **kwargs):
        if not isinstance(_s, (pd.Series, pd.DataFrame, np.array)):
            raise TypeError()
        return fn(_s, *args, **kwargs)
    wrapper.__name__ = fn.__name__
    return wrapper

@check_input
def Mean(_s):
    return pd.Series({'Mean': np.mean(_s)})

@check_input
def CoV(_s):
    return pd.Series({'CoV' : np.std(_s)/np.mean(_s)})

df.agg({'a': [CoV, Mean], 'c': Mean}).stack().reset_index(level=-1, drop=True)
# returns:
             a      c
CoV   0.584645    NaN
Mean  0.511350  2.011

pandas user-defined functions — Databricks Documentation, Learn how to implement pandas user-defined functions (PyArrow) for use from types of pandas UDFs: scalar, grouped map, and grouped aggregate. Execute function as a Spark vectorized UDF df.select(multiply(col("x"),� Description User-Defined Aggregate Functions (UDAFs) are user-programmable routines that act on multiple rows at once and return a single aggregated value as a result. This documentation lists the classes that are required for creating and registering UDAFs.

Try using .apply(): df.apply(CoV, axis=0)

This also works for me: test4 = df.agg(CoV, axis=0)

What you'll get is a dataframe with scalar results of the applied function:

            a         b         c
CoV  0.585977  0.584645  0.406688

Then just slice the Series you need.

Assumptions: You want to apply a single custom scalar function (Series to scalar) on different columns without group-bys .

Edit: If you'd like to combine multiple functions, another thing you can do is to present all of them as output of your function (which returns a pd.Series). For example you can rewrite your custom function as:

def myfunc(_s):
    return pd.Series({'mean': _s.mean(), 
                       'std': _s.std(), 
                       'CoV' : np.std(_s)/np.mean(_s)})

Then running this one with .apply() will yield multiple results. df.apply(myfunc) will now give:

               a           b           c
mean    0.495922    0.511350    2.011000
std     0.290744    0.299108    0.818259
CoV     0.585977    0.584645    0.406688

See more here: Pandas how to apply multiple functions to dataframe

Writing custom aggregation functions with Pandas, A case use of an aggregation function on Pandas is, for example, when you've got a DataFrame (I'll refer to as df on the code snippets) like the� Notes. agg is an alias for aggregate.Use the alias. A passed user-defined-function will be passed a Series for evaluation. Examples >>> df = pd.

pandas.DataFrame.aggregate — pandas 1.0.5 documentation, agg is an alias for aggregate . Use the alias. A passed user-defined-function will be passed a Series for evaluation. Examples. >>> df = pd.DataFrame([[1, 2, 3], . Creating and Deploying an Aggregate UDF An aggregate user-defined function (UDF) is a C++ class that performs calculations across MarkLogic range index values or index value co-occurrences.

pandas.DataFrame.aggregate — pandas 0.25.3 documentation, Aggregate using one or more operations over the specified axis. A passed user -defined-function will be passed a Series for evaluation. Examples. >>> df = pd. Notice that user defined functions are listed without double quotes. For each group (set of records for each continent), our mode() function is called and it returns a value. Since there can be multiple modes in a given data set, the mode function will always return a Series.

6-Aggregation-and-Grouping, Receive and overview of the apply function; Write custom functions for df. monthly_income.apply(np.log).head() # use head so we don't print every single row. UDF a.k.a User Defined Function, If you are coming from SQL background, UDF’s are nothing new to you as most of the traditional RDBMS databases support User Defined Functions, and Spark UDF’s are similar to these. In Spark, you create UDF by creating a function in a language you prefer to use for Spark.

Comments
  • Thanks this also works if i use cov and mean together.
  • Thanks nice solution does not work with cov and mean together though. Just for the record this would be the prefered solution because the uppdate is only in the function but it cannot handle UDF and built in function at the same time.
  • Thanks super informative answer. The solution is cool but it’s a little bit unfortunate that all built in functions have to be redefiened.
  • Yup...there is probably a way around it, just not sure from 20 minutes in the code.
  • Good answer, nice to see one that includes a look under the hood, especially w.r.t. passing a list of multiple func objects.
  • Nice one, op can also select desired columns with df[['a']].agg(CoV, axis=0).
  • Thanks nice solution does not work with cov and mean together though.
  • True, edited to include the multiple function case (interesting topic indeed) :)