How to calculate mean and standard deviation given a PySpark DataFrame?

pyspark groupby standard deviation
pyspark average group by
spark dataframe mean
pyspark average multiple columns
pyspark mean of all columns
spark sql average group by
spark dataframe divide column by constant
spark dataframe describe

I have PySpark DataFrame (not pandas) called df that is quite large to use collect(). Therefore the below-given code is not efficient. It was working with a smaller amount of data, however now it fails.

import numpy as np

myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)

Is there any way to get mean and std as two variables by using pyspark.sql.functions or similar?

from pyspark.sql.functions import mean as mean_, std as std_

I could use withColumn, however, this approach applies the calculations row by row, and it does not return a single variable.

UPDATE:

Sample content of df:

+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|

I should calculate mean and standard deviation of score values, e.g. the value 1 in [691,1] is one of scores.

You can use the built in functions to get aggregate statistics. Here's how to get mean and standard deviation.

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = df.select(
    _mean(col('columnName')).alias('mean'),
    _stddev(col('columnName')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

Note that there are three different standard deviation functions. From the docs the one I used (stddev) returns the following:

Aggregate function: returns the unbiased sample standard deviation of the expression in a group

You could use the describe() method as well:

df.describe().show()

Refer to this link for more info: pyspark.sql.functions

UPDATE: This is how you can work through the nested data.

Use explode to extract the values into separate rows, then call mean and stddev as shown above.

Here's a MWE:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev

# mock up sample dataframe
df = sqlCtx.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)

# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())

# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
    .withColumn('score', get_score(col('exploded')))\
    .select(
        _mean(col('score')).alias('mean'),
        _stddev(col('score')).alias('std')
    )\
    .collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print([mean, std])

Which outputs:

[2.3333333333333335, 1.505545305418162]

You can verify that these values are correct using numpy:

vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])

Explanation: Your "products" column is a list of lists. Calling explode will make a new row for each element of the outer list. Then grab the "score" value from each of the exploded rows, which you have defined as the second element in a 2-element list. Finally, call the aggregate functions on this new column.

Mean, Variance and standard deviation of column in Pyspark , Mean, Variance & standard deviation of the group pyspark. of dataframe in pyspark with example; Standard deviation of each group of dataframe in pyspark with example Mean of the column in pyspark is calculated using aggregate function – agg() function. Do NOT follow this link or you will be banned from the site! Mean, Variance and standard deviation of column in pyspark can be accomplished using aggregate() function with argument column name followed by mean , variance and standard deviation according to our need. Mean, Variance and standard deviation of the group in pyspark can be calculated by using groupby along with aggregate() Function.

You can use mean and stddev from pyspark.sql.functions:

import pyspark.sql.functions as F

df = spark.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)

result_df = (
    df
    .withColumn(
        'val_list',
        F.array(df.products.getItem(0).getItem(1),df.products.getItem(1).getItem(1))
    )
    .select(F.explode('val_list').alias('val'))
    .select(F.mean('val').alias('mean'), F.stddev('val').alias('stddev'))
)

print(result_df.collect())

which outputs:

[Row(mean=2.3333333333333335, stddev=1.505545305418162)]

You can read more about pyspark.sql.functions here.

pyspark.sql.functions — PySpark 2.1.3 documentation, 'col': 'Returns a :class:`Column` based on the given column name. 'mean': ' Aggregate function: returns the average of the values in a group. function: returns the unbiased sample standard deviation of' + ' the expression in a group. PySpark: calculate mean, standard deviation and values around the one-step average My raw data comes in a tabular format. It contains observations from different variables. Each observation with the variable name, the timestamp and the value at that time.

For Standard Deviation, better way of writing is as below. We can use formatting (to 2 decimal) and using the column Alias name

data_agg=SparkSession.builder.appName('Sales_fun').getOrCreate()    
data=data_agg.read.csv('sales_info.csv',inferSchema=True, header=True)

from pyspark.sql.functions import *

*data.select((format_number(stddev('Sales'),2)).alias('Sales_Stdev')).show()*

pyspark.sql module — PySpark 2.1.0 documentation, When getting the value of a config, this defaults to the value set in the underlying from pyspark.sql.types import * >>> schema = StructType([ . For example 0 is the minimum, 0.5 is the median, 1 is the maximum. Aggregate function: returns the unbiased sample standard deviation of the expression in a group. Standard deviation Function in python pandas is used to calculate standard deviation of a given set of numbers, Standard deviation of a data frame, Standard deviation of column or column wise standard deviation in pandas and Standard deviation of rows, let’s see an example of each.

How to calculate the mean of a dataframe column and find the top 10%, For the standard deviation, see scala - Calculate the standard deviation of grouped data in a Spark DataFrame - Stack Overflow. For grouping� A standard deviation shows how much variation exists in the data from the average. Problem. Given a list of employee salary and the department ,determine the standard deviation and mean of salary of each department. Here is a sample input data attached employee_info.csv

Mean, Calculate the temporal, spatial, and zonal mean of temperature data over eastern from pyspark4climate.functions import shift_lon_udf from pyspark.sql import� Pandas dataframe.std() function return sample standard deviation over requested axis. By default the standard deviations are normalized by N-1. It is a measure that is used to quantify the amount of variation or dispersion of a set of data values. For more information click here

Statistical and Mathematical Functions with Spark Dataframes, In [1]: from pyspark.sql.functions import rand, randn In [2]: # Create a mean, standard deviation, and minimum and maximum value for each� Once you’re ready, run the code below in order to calculate the stats from the imported CSV file using pandas. Note that you’ll need to change the path name (2nd row in the code) to reflect the location where the CSV file is stored on your computer.

Comments
  • Can you give a sample of of your data?
  • @Tai: I posted a small example.
  • Nice, thanks. However, one of difficulties is the nested data that I have. Could you please check my update?
  • I get an error UnboundLocalError: local variable 'mean' referenced before assignment.
  • To solve this error, it's necessary to update the import statement as follows: from pyspark.sql.functions import mean as _mean, stddev as _stddev
  • @Markus thanks for pointing that out. I've updated the code accordingly.
  • Do not use import *
  • Yeah offcourse .. that's just for reference. Thanks