Spark: TOPN after GroupBy

spark dataframe sort within group
pyspark top n per group
pyspark top 10 rows
spark window limit
pyspark top 10 by group
spark get latest record
pyspark top by key
spark sql top n per group

I have an RDD P mapped to the class:

case class MyRating(userId:Int, itemId:Int, rating:Double)

I am interested in finding TopN entries for each User i.e. GroupBy userId and within each formed group, filter out the TopN (say 10) entries based on highest rating.

I did the following:

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val B : RDD[((Int), List[MyRating])] = key.mapValues(iter => iter.toList.sortBy(_.rating, false))
val C = values.groupByKey.take(10)

Clearly applying .take(10) after groupByKey leaves me with only 10 keys(Users) and will not filter out the top10 ratings for each User.

How do we go about applying .take(N) after a groupBy so that it acts on some part of value instead of key itself?

If I understand correctly, what you need to do is: group the RDD by user Id, and then for every (id, list) tuple give back the id and the list sorted and trimmed to 10 elements

P
  .groupBy(_.userId)  
  .map{ case (key, it) => 
    (key, it.toList.sortBy(mr => -mr.rating).take(10)) 
  }

Spark: TOPN after GroupBy, If I understand correctly, what you need to do is: group the RDD by user Id, and then for every (id, list) tuple give back the id and the list sorted� Spark groupBy function is defined in RDD class of spark. It is a transformation operation which means it will follow lazy evaluation. We need to pass one function (which defines a group for an element) which will be applied to the source RDD and will create a new RDD as with the individual groups and the list of items in that group.

A naive approach is to take n values:

B.mapValues(_.take(n))

but if you need only small subset of values it would be better to use for example aggregateByKey and drop obsolete records on the run instead of grouping everything. You probable want want something more efficient in practice (you can check Spark implementation of top / takeOrdered) but you can start with something like this:

import scala.math.Ordering
import scala.collection.mutable.PriorityQueue

implicit val ord = Ordering.by[MyRating, Double](_.rating)

val pairs = rdd.keyBy(_.userId)
pairs.aggregateByKey(new scala.collection.mutable.PriorityQueue[MyRating]())(
  (acc, x) => {
    acc.enqueue(x)
    acc.take(n)
  },
  (acc1, acc2) => (acc1 ++ acc2).take(n)
)

Note that above snippet requires Scala 2.11+ due to SI-7568.

Spark: TOPN after GroupBy - scala - html, I have an RDD P mapped to the class: case class MyRating(userId:Int, itemId:Int, rating:Double) I am interested in finding TopN entries for each User i.e.� Order By and Group By Clause in SQL. In this SQL tutorial we will learn how to use Order by and Group By in SQL. Group By in SQL is used to arrange similar data into group and Order By in SQL is is used to sort the data in the ascending or descending order.

You were very close, but you need to take the top-N entries within the mapping of A to B. For example, if you wanted to take the top 2 MyRating items from a List, the below code would do the trick. B would be an RDD containing a List of the top 2 MyRatings for each userId. (Also, the sortBy function will work simply by making the rating negative).

case class MyRating(userId:Int, itemId:Int, rating:Double)

val plist:List[MyRating] = List(MyRating(1,0,1),MyRating(1,1,5),MyRating(1,2,7),MyRating(1,3,9),MyRating(1,4,10),MyRating(2,5,1),MyRating(2,6,5),MyRating(2,6,7))
val P: org.apache.spark.rdd.RDD[MyRating] = sc.parallelize(plist)

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val TOPCOUNT = 2
val B : RDD[((Int), List[MyRating])] = A.mapValues(iter => iter.toList.sortBy(- _.rating).take(TOPCOUNT))

How to get all columns after groupby on Dataset<Row> in spark sql , How to get all columns after groupby on Dataset<Row> in spark sql 2.1.0 Clause with Examples, How do I get other columns with spark Dataframe Groupby? But if we need to show such additional columns (i.e. “show me the top-priority� SQL ROW_NUMBER Function Example. The SQL ROW_NUMBER Function allows you to assign the rank number to each record present in a partition. In this example, we show you how to Select First Row from each SQL Group.

Here is an example using aggregateByKey as suggested by zero323:

val A : RDD[(Int, MyRating)] = P.keyBy(r => r.userId)
val B = A.aggregateByKey(List[MyRating]())(
  (l, r) => (l :+ r).sortBy(-_.rating).take(10),
  (l1, l2) => (l1 ++ l2).sortBy(-_.rating).take(10))

The benefit of using this method is that you don't possibly shuffle a large amount of data between your executors. If the ratings from a single user are distributed over multiple nodes, groupBy needs to send all the ratings for a user to the same executor, whereas with aggregateByKey first a top-N list is built on each executor, and then only those lists are shuffled and combined.

Whether this is beneficial to you depends on the distribution of data. If you don't have many more ratings than the final top you are after you're not gaining much (especially with my naive implementation that does a sort for each separate rating). However, if the number of ratings per executor is orders of magnitude larger you can win a lot.

Top N items from a Spark DataFrame/RDD, Get the distinct elements of each group by other field on a Spark 1.6 Dataframe asked Jul 23, 2019 in Big Data Hadoop & Spark by Aarav ( 11.5k points) apache- � Technicalities: In Spark 1.6, DataFrames appeared. In Spark 2.0, DataFrames became DataSets of Row objects. In Spark 2.0 you should use DataSets where possible. They are more general and can contain elements of other classes as well. The CCA175 currently only comes with Spark 1.6 though.

I found the related post : Spark: Get top N by key

Copying @jbochi's recommendation :

Since version 1.4, there is a built-in way to do this using MLLib: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala

val scores = sc.parallelize(Array(
      ("a", 1),  
      ("a", 2), 
      ("a", 3), 
      ("b", 3), 
      ("b", 1), 
      ("a", 4),  
      ("b", 4), 
      ("b", 2)
))
import org.apache.spark.mllib.rdd.MLPairRDDFunctions.fromPairRDD
scores.topByKey(2) // Where the keys are a and b

Retrieve top n in each group of a DataFrame in pyspark, orderBy(df['score'].desc()) You can also understand this from spark official programming guide . It is a good place to start learning Spark. Data. As of Spark 2.4, you can use joins only when the query is in Append output mode. Other output modes are not yet supported. As of Spark 2.4, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used. Cannot use streaming aggregations before joins.

How to get other columns when using Spark DataFrame groupby , I will only get a DataFrame with columns "age" and "count(id)",but in df,there are many other columns like "name". In all,I want to get the result as� By default Spark SQL uses spark.sql.shuffle.partitions number of partitions for aggregations and joins, i.e. 200 by default. That often leads to explosion of partitions for nothing that does impact the performance of a query since these 200 tasks (per partition) have all to start and finish before you get the result.

DataFrame — Dataset of Rows with RowEncoder � The Internals of , In Spark 2.0.0 DataFrame is a mere type alias for Dataset[Row] . groupBy('word ).count scala> counted.show +----+-----+ |word|count| +----+-----+ | two| 1| | one| 2| In the following example you query for the top 5 of the most active bidders. This is a variant of groupBy that can only group by existing columns using column names (i.e. cannot construct expressions). // Compute the average for all numeric columns grouped by department. df.groupBy("department").avg() // Compute the max age and average salary, grouped by department and gender.

pyspark.sql module — PySpark 3.0.0 documentation, simpleString , except that top level struct type can omit the struct<> and atomic types use Get the existing SQLContext or create a new one with given SparkContext. groupBy(df.name).avg().collect()) [Row(name='Alice', avg(age)= 2.0),� Big Data Hadoop & Spark ; Column alias after groupBy in pyspark ; Column alias after groupBy in pyspark. 0 Get your technical queries answered by top developers !

Comments
  • Thanks, aggregateByKey makes more sense.. looks like a powerful construct.
  • Does this method apply to DF as well? Somehow I (superstitiously) avoid to convert from DF to RDD for performance reasons.
  • Thanks for explaining the difference between the two approaches. Much appreciated.