Top N items from a Spark DataFrame/RDD

spark dataframe first n rows scala
pyspark rdd top n
spark dataframe tail
spark dataframe last n rows
spark dataframe limit
spark dataframe head
spark dataframe drop first n rows
top function in spark

My requirement is to get the top N items from a dataframe.

I've this DataFrame:

val df = List(
      ("MA", "USA"),
      ("MA", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA")).toDF("value", "country")

I was able to map it to an RDD[((Int, String), Long)] colValCount: Read: ((colIdx, value), count)

((0,CT),5)
((0,MA),2)
((0,OH),4)
((0,NY),6)
((1,USA),17)

Now I need to get the top 2 items for each column index. So my expected output is this:

RDD[((Int, String), Long)]

((0,CT),5)
((0,NY),6)
((1,USA),17)

I've tried using freqItems api in DataFrame but it's slow.

Any suggestions are welcome.

For example:

import org.apache.spark.sql.functions._

df.select(lit(0).alias("index"), $"value")
   .union(df.select(lit(1), $"country"))
   .groupBy($"index", $"value")
   .count
  .orderBy($"count".desc)
  .limit(3)
  .show

// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// |    1|  USA|   17|
// |    0|   NY|    6|
// |    0|   CT|    5|
// +-----+-----+-----+

where:

df.select(lit(0).alias("index"), $"value")
  .union(df.select(lit(1), $"country"))

creates a two column DataFrame:

// +-----+-----+
// |index|value|
// +-----+-----+
// |    0|   MA|
// |    0|   MA|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    1|  USA|
// |    1|  USA|
// |    1|  USA|
// +-----+-----+

If you want specifically two values for each column:

import org.apache.spark.sql.DataFrame

def topN(df: DataFrame, key: String, n: Int)  = {
   df.select(
        lit(df.columns.indexOf(key)).alias("index"), 
        col(key).alias("value"))
     .groupBy("index", "value")
     .count
     .orderBy($"count")
     .limit(n)
}

topN(df, "value", 2).union(topN(df, "country", 2)).show
// +-----+-----+-----+ 
// |index|value|count|
// +-----+-----+-----+
// |    0|   MA|    2|
// |    0|   OH|    4|
// |    1|  USA|   17|
// +-----+-----+-----+

So like pault said - just "some combination of sort() and limit()".

scala - Top N items from a Spark DataFrame/RDD, import org.apache.spark.sql.DataFrame def topN(df: DataFrame, key: String, n: Int​) = { df.select( lit(df.columns.indexOf(key)).alias("index"),  spark top n records example in a sample data using rdd and dataframe. Finding outliers is an important part of data analysis because these records are typically the most interesting and unique pieces of data in the set.

The easiest way to do this - a natural window function - is by writing SQL. Spark comes with SQL syntax, and SQL is a great and expressive tool for this problem.

Register your dataframe as a temp table, and then group and window on it.

spark.sql("""SELECT idx, value, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY c DESC) as r 
             FROM (
               SELECT idx, value, COUNT(*) as c 
               FROM (SELECT 0 as idx, value FROM df UNION ALL SELECT 1, country FROM df) 
               GROUP BY idx, value) 
             HAVING r <= 2""").show()

I'd like to see if any of the procedural / scala approaches will let you perform the window function without an iteration or loop. I'm not aware of anything in the Spark API that would support it.

Incidentally, if you have an arbitrary number of columns you want to include then you can quite easily generate the inner section (SELECT 0 as idx, value ... UNION ALL SELECT 1, country, etc) dynamically using the list of columns.

spark top n records example in a sample data using rdd and dataframe, spark top n records example in a sample data using rdd and dataframe The point of this pattern is to find the best records for a specific criterion so that you can take a look at them and perhaps figure out Row;; import org.apache.spark.​sql. Get the distinct elements of each group by other field on a Spark 1.6 Dataframe asked Jul 23, 2019 in Big Data Hadoop & Spark by Aarav ( 11.5k points) apache-spark

Given your last RDD:

val rdd =
  sc.parallelize(
    List(
      ((0, "CT"), 5),
      ((0, "MA"), 2),
      ((0, "OH"), 4),
      ((0, "NY"), 6),
      ((1, "USA"), 17)
    ))

rdd.filter(_._1._1 == 0).sortBy(-_._2).take(2).foreach(println)
> ((0,NY),6)
> ((0,CT),5)
rdd.filter(_._1._1 == 1).sortBy(-_._2).take(2).foreach(println)
> ((1,USA),17)

We first get items for a given column index (.filter(_._1._1 == 0)). Then we sort items by decreasing order (.sortBy(-_._2)). And finally, we take at most the 2 first elements (.take(2)), which takes only 1 element if the nbr of record is lower than 2.

Retrieve top n in each group of a DataFrame in pyspark, I would suggest you to use window functions here in order to attain the rank of each row based on user_id and score, and subsequently filter  How to find top N records per group using pyspark RDD [not by dataframe API] Highlighted. How to find top N records per group using pyspark RDD [not by dataframe API

You can map each single partition using this helper function defined in Sparkz and then combine them together:

package sparkz.utils

import scala.reflect.ClassTag

object TopElements {
  def topN[T: ClassTag](elems: Iterable[T])(scoreFunc: T => Double, n: Int): List[T] =
    elems.foldLeft((Set.empty[(T, Double)], Double.MaxValue)) {
      case (accumulator@(topElems, minScore), elem) =>
        val score = scoreFunc(elem)
        if (topElems.size < n)
          (topElems + (elem -> score), math.min(minScore, score))
        else if (score > minScore) {
          val newTopElems = topElems - topElems.minBy(_._2) + (elem -> score)
          (newTopElems, newTopElems.map(_._2).min)
        }
        else accumulator
    }
      ._1.toList.sortBy(_._2).reverse.map(_._1)
}

Source: https://github.com/gm-spacagna/sparkz/blob/master/src/main/scala/sparkz/utils/TopN.scala

What is the difference between DataFrame.first(), head(), head(n , show()/show(n) return Unit (void) and will print up to the first 20 rows in a tabular form. These operations may require a shuffle if there are any  Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in local mode (--master = local[n]) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):

If you are working with Spark SQL Dataframes, in my opinion the best (and easier solution to understand) is to perfom your code like that:

val test: Dataframe = df.select(col("col_name"))
test.show(5, false)

Hope it helps you :)

How to find top N records per group using pyspark , Re: How to find top N records per group using pyspark RDD [not by dataframe API]. ssharma. Expert Contributor. Created ‎06-20-2018 02:52  You can use either top or takeOrdered with key argument: newRDD.top(2, key=lambda x: x[2]) or. newRDD.takeOrdered(2, key=lambda x: -x[2]) Note that top is taking elements in descending order and takeOrdered in ascending so key function is different in both cases.

Spark SQL and DataFrames, different methods for converting existing RDDs into DataFrames. The first method uses reflection to infer the schema of an RDD you to construct DataFrames when the columns and their types are  get TopN of all groups after group by using Spark DataFrame. How to group by user and then return TopN items from How to get the top n values from dataframe. 0.

Data Exploration Using Spark SQL, Spark SQL is the newest component of Spark and provides a SQL like interface. A SchemaRDD has all of the functions of a normal RDD. Row] = Array([​39365]) For example, the following query returns the top 10 usersnames by the​  Spark SQL can automatically infer the schema of a JSON dataset and load it as a Dataset[Row] . This conversion can be done using SparkSession.read.json() on either a Dataset[String] , or a JSON file. Note that the file that is offered as a json file is not a typical JSON file.

Spark DataFrame, Here, I've explained how to get the first row, minimum, maximum of each group in Spark DataFrame using Spark SQL window functions and  A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. A unique ID for this RDD (within its SparkContext). The SparkContext that this RDD was created on. Persist this RDD with the default storage level ( MEMORY_ONLY_SER ).

Comments
  • I think you need some combination of sort() and limit(), but TBH I don't understand how you get your output.
  • Some combination of orderBy() and limit()...basically what I said :-)
  • @pault Yeah, I guess it is either that or some combination of groupBy and agg here with a window function from time to time :-)
  • .orderBy($"count".desc).limit(3) gives the specified result in this case, but it doesn't give the top 2 items for each column index in the general case.
  • Thanks let me try it out!
  • @user8371915 this works but still slow for TB of data as I'm also running for 170+ columns. The performance is comparable to freqItems API I mentioned earlier. It takes around 1+ hour to process the entire data.
  • Thanks, I've tried using window functions it complained of GC overhead. I'll try out your solution too.