How to get other columns when using Spark DataFrame groupby?

spark dataframe groupby multiple columns
spark sql group by example
spark groupby agg
pyspark groupby multiple columns
spark dataframe groupby performance
group by key dataframe spark
spark dataframe multiple aggregate functions
spark groupby collect_list

when I use DataFrame groupby like this:

df.groupBy(df("age")).agg(Map("id"->"count"))

I will only get a DataFrame with columns "age" and "count(id)",but in df,there are many other columns like "name".

In all,I want to get the result as in MySQL,

"select name,age,count(id) from df group by age"

What should I do when use groupby in Spark?

Long story short in general you have to join aggregated results with the original table. Spark SQL follows the same pre-SQL:1999 convention as most of the major databases (PostgreSQL, Oracle, MS SQL Server) which doesn't allow additional columns in aggregation queries.

Since for aggregations like count results are not well defined and behavior tends to vary in systems which supports this type of queries you can just include additional columns using arbitrary aggregate like first or last.

In some cases you can replace agg using select with window functions and subsequent where but depending on the context it can be quite expensive.

Pyspark Joins by Example – Learn by Marketing, 1 Answer. Suppose you have a df that includes columns “name” and “age”, and on these two columns you want to perform groupBY. Now, in order to get other columns also after doing a groupBy you can use join function. Now, data_joined will have all columns including the count values. Now, in order to get other columns also after doing a groupBy you can use join function. chose_group = ['name', 'age'] data_counts = df.groupBy(chose_group).count().alias("counts") data_joined = df.join(data_counts, chose_group).dropDuplicates() Now, data_joined will have all columns including the count values.

One way to get all columns after doing a groupBy is to use join function.

feature_group = ['name', 'age']
data_counts = df.groupBy(feature_group).count().alias("counts")
data_joined = df.join(data_counts, feature_group)

data_joined will now have all columns including the count values.

Working with Spark DataFrame Filter, agg() - Using agg() function, we can calculate more than one aggregate at a time. groupBy and aggregate on multiple DataFrame columns. RelationalGroupedDataset. When we perform groupby () on Spark Dataframe, it returns RelationalGroupedDataset object which contains below aggregate functions. count () - Returns the count of rows for each group. mean () - Returns the mean of values for each group. max () - Returns the maximum of values for each group.

May be this solution will helpfull.

from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql import Window

    name_list = [(101, 'abc', 24), (102, 'cde', 24), (103, 'efg', 22), (104, 'ghi', 21),
                 (105, 'ijk', 20), (106, 'klm', 19), (107, 'mno', 18), (108, 'pqr', 18),
                 (109, 'rst', 26), (110, 'tuv', 27), (111, 'pqr', 18), (112, 'rst', 28), (113, 'tuv', 29)]

age_w = Window.partitionBy("age")
name_age_df = sqlContext.createDataFrame(name_list, ['id', 'name', 'age'])

name_age_count_df = name_age_df.withColumn("count", F.count("id").over(age_w)).orderBy("count")
name_age_count_df.show()
Output:
+---+----+---+-----+
| id|name|age|count|
+---+----+---+-----+
|109| rst| 26|    1|
|113| tuv| 29|    1|
|110| tuv| 27|    1|
|106| klm| 19|    1|
|103| efg| 22|    1|
|104| ghi| 21|    1|
|105| ijk| 20|    1|
|112| rst| 28|    1|
|101| abc| 24|    2|
|102| cde| 24|    2|
|107| mno| 18|    3|
|111| pqr| 18|    3|
|108| pqr| 18|    3|
+---+----+---+-----+

Spark, Dataframe, it returns RelationalGroupedDataset object which contains below aggregate functions. count() - Returns the count of rows for each group. mean() - Returns the mean of values for each group. max() - Returns the maximum of values for each group. I have a pandas data frame df like:. a b A 1 A 2 B 5 B 5 B 4 C 6 I want to group by the first column and get second column as lists in rows:. A [1,2] B [5,5,4] C [6] Is it possible to do something like this using pandas groupby?

Aggregate functions reduce values of rows for specified columns within the group. If you wish to retain other row values you need to implement reduction logic that specifies a row from which each value comes from. For instance keep all values of the first row with the maximum value of age. To this end you can use a UDAF (user defined aggregate function) to reduce rows within the group.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._


object AggregateKeepingRowJob {

  def main (args: Array[String]): Unit = {

    val sparkSession = SparkSession
      .builder()
      .appName(this.getClass.getName.replace("$", ""))
      .master("local")
      .getOrCreate()

    val sc = sparkSession.sparkContext
    sc.setLogLevel("ERROR")

    import sparkSession.sqlContext.implicits._

    val rawDf = Seq(
      (1L, "Moe",  "Slap",  2.0, 18),
      (2L, "Larry",  "Spank",  3.0, 15),
      (3L, "Curly",  "Twist", 5.0, 15),
      (4L, "Laurel", "Whimper", 3.0, 15),
      (5L, "Hardy", "Laugh", 6.0, 15),
      (6L, "Charley",  "Ignore",   5.0, 5)
    ).toDF("id", "name", "requisite", "money", "age")

    rawDf.show(false)
    rawDf.printSchema

    val maxAgeUdaf = new KeepRowWithMaxAge

    val aggDf = rawDf
      .groupBy("age")
      .agg(
        count("id"),
        max(col("money")),
        maxAgeUdaf(
          col("id"),
          col("name"),
          col("requisite"),
          col("money"),
          col("age")).as("KeepRowWithMaxAge")
      )

    aggDf.printSchema
    aggDf.show(false)

  }


}

The UDAF:

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

class KeepRowWithMaxAmt extends UserDefinedAggregateFunction {
// This is the input fields for your aggregate function.
override def inputSchema: org.apache.spark.sql.types.StructType =
  StructType(
    StructField("store", StringType) ::
    StructField("prod", StringType) ::
    StructField("amt", DoubleType) ::
    StructField("units", IntegerType) :: Nil
  )

// This is the internal fields you keep for computing your aggregate.
override def bufferSchema: StructType = StructType(
  StructField("store", StringType) ::
  StructField("prod", StringType) ::
  StructField("amt", DoubleType) ::
  StructField("units", IntegerType) :: Nil
)


// This is the output type of your aggregation function.
override def dataType: DataType =
  StructType((Array(
    StructField("store", StringType),
    StructField("prod", StringType),
    StructField("amt", DoubleType),
    StructField("units", IntegerType)
  )))

override def deterministic: Boolean = true

// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
  buffer(0) = ""
  buffer(1) = ""
  buffer(2) = 0.0
  buffer(3) = 0
}

// This is how to update your buffer schema given an input.
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

  val amt = buffer.getAs[Double](2)
  val candidateAmt = input.getAs[Double](2)

  amt match {
    case a if a < candidateAmt =>
      buffer(0) = input.getAs[String](0)
      buffer(1) = input.getAs[String](1)
      buffer(2) = input.getAs[Double](2)
      buffer(3) = input.getAs[Int](3)
    case _ =>
  }
}

// This is how to merge two objects with the bufferSchema type.
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {

  buffer1(0) = buffer2.getAs[String](0)
  buffer1(1) = buffer2.getAs[String](1)
  buffer1(2) = buffer2.getAs[Double](2)
  buffer1(3) = buffer2.getAs[Int](3)
}

// This is where you output the final value, given the final value of your bufferSchema.
override def evaluate(buffer: Row): Any = {
  buffer
}
}

How do I add a filter to a DataFrame in spark? issue: As we use only row_id and ODS_WII_VERB in the group by clause we are unable to get the other columns. How to get other columns as well. We tried creating a spark sql subquery but it seems spark sub query is not working in spark structured streaming.

Here an example that I came across in spark-workshop

val populationDF = spark.read
                .option("infer-schema", "true")
                .option("header", "true")
                .format("csv").load("file:///databricks/driver/population.csv")
                .select('name, regexp_replace(col("population"), "\\s", "").cast("integer").as("population"))

val maxPopulationDF = populationDF.agg(max('population).as("populationmax"))

To get other columns, I do a simple join between the original DF and the aggregated one

populationDF.join(maxPopulationDF,populationDF.col("population") === maxPopulationDF.col("populationmax")).select('name, 'populationmax).show()

rows that have the same values on multiple selected columns. pyspark groupby, A PySpark groupby You've seen how to use the dask framework and its DataFrame abstraction to do some calculations. However, as you've seen in the video, in the big data world Spark is probably a more popular choice for data processing.

How to get other columns as well. We tried creating a spark sql subquery but it seems spark sub query is not working in spark structured  Then filter out the rows such that the value in column B is equal to the max. Another possible approach is to apply join the dataframe with itself specifying "leftsemi". This kind of join includes all columns from the dataframe on the left side and no columns on the right side.

Let's use groupBy() to calculate the total number of goals scored by each player. Let's create another DataFrame with information on students, their country, The cube function “takes a list of columns and applies aggregate  import org.apache.spark.sql.functions.sum val exprs = df.columns.map(sum(_)) df.groupBy($"col1").agg(exprs.head, exprs.tail: _*) There are some other way to achieve a similar effect but these should more than enough most of the time. See also: Multiple Aggregate operations on the same column of a spark dataframe

I have a pyspark 2. join(ordersDF, customersDF. groupby(a_column). column import Column, Using toDF() - To change all columns in a PySpark DataFrame. GROUP BY on Spark Data frame is used to aggregation on Data Frame data. Lets take the below Data for demonstrating about how to use groupBy in Data Frame [crayon-5ebe706f0e4aa920767168/] Lets use groupBy, here we are going to find how many Employees are there to get the specific salary range or COUNT the Employees who …

Comments
  • why not use "select name,age,count(id) from df group by age, name"?, only "group by age" will select many different name, but display only one name
  • In my question,I just give an easy example.When use "group by age,name",it will get a different result with "group by age" apparently....
  • One thing that is important to consider is: "when I group-by one attribute, and I need another column, which value will I use from the other column?" Since unless we specify an aggregation technique for the other column, then the computer can't possibly know which of the possibly multiple other values in the other column to take
  • "select name,age,count(id) from df group by age" this does not looks like valid SQL statement. You cannot select name if its not part of group by clause.
  • how about groupBy("columnName").agg(collect_list(struct("*")).as("newGroupedListColumn"))?
  • But rows will be repeated.
  • Consider this df.join(data_counts, feature_group).dropDuplicates()
  • This answer is incorrect since topic starter required grouping only by 1 field
  • Actually, I think this answer has some merit; at least for my use case, group-by on two attributes is acceptable. And it adheres to the "original" SQL intentions