Order by key from a Map in a Dataset

Related searches

I want to order by timestamp some avro files that I retrieve from HDFS.

The schema of my avro files is :

headers : Map[String,String], body : String

Now the tricky part is that the timestamp is one of the key/value from the map. So I have the timestamp contained in the map like this :

key_1 -> value_1, key_2 -> value_2, timestamp -> 1234567, key_n -> value_n

Note that the type of the values is String.

I created a case class to create my dataset with this schema :

case class Root(headers : Map[String,String], body: String)

Creation of my dataset :

val ds = spark
          .read
          .format("com.databricks.spark.avro")
          .load(pathToHDFS)
          .as[Root]

I don't really know how to begin with this problem since I can only get the columns headers and body. How can I get the nested values to finally sort by timestamp ?

I would like to do something like this :

ds.select("headers").doSomethingToGetTheMapStructure.doSomeConversionStringToTimeStampForTheColumnTimeStamp("timestamp").orderBy("timestamp")

A little precision : I don't want to loose any data from my initial dataset, just a sorting operation.

I use Spark 2.3.0.

The loaded Dataset should look something similar to the sample dataset below:

case class Root(headers : Map[String, String], body: String)

val ds = Seq(
  Root(Map("k11"->"v11", "timestamp"->"1554231600", "k12"->"v12"), "body1"),
  Root(Map("k21"->"v21", "timestamp"->"1554134400", "k22"->"v22"), "body2")
).toDS

You can simply look up the Map by the timestamp key, cast the value to Long, and perform an orderBy as follows:

ds.
  withColumn("ts", $"headers"("timestamp").cast("Long")).
  orderBy("ts").
  show(false)
// +-------------------------------------------------+-----+----------+
// |headers                                          |body |ts        |
// +-------------------------------------------------+-----+----------+
// |[k21 -> v21, timestamp -> 1554134400, k22 -> v22]|body2|1554134400|
// |[k11 -> v11, timestamp -> 1554231600, k12 -> v12]|body1|1554231600|
// +-------------------------------------------------+-----+----------+

Note that $"headers"("timestamp") is just the same as using the apply column method (i.e. $"headers".apply("timestamp")).

Alternatively, you could also use getItem to access the Map by key, like:

$"headers".getItem("timestamp")

Java HashMap Search and Sort, HashMap's utility includes finding keys and values by setting up a filtering pipeline, loading data from a CSV file, and sorting. If a schema specifies the keyref element, the element is converted during the schema mapping process to a corresponding foreign key constraint on the columns in the tables of the DataSet. By default, the keyref element also generates a relation, with the ParentTable , ChildTable , ParentColumn , and ChildColumn properties specified on the relation.

You can use Scala's sortBy, which takes a function. I would advise you to explicitly declare the val ds as a Vector (or other collection), that way you will see the applicable functions in IntelliJ (if you're using IntelliJ) and it will definitely compile.

See my example below based on your code :

  case class Root(headers : Map[String,String], body: String)

  val ds: Vector[Root] = spark
    .read
    .format("com.databricks.spark.avro")
    .load(pathToHDFS)
    .as[Root]

  val sorted = ds.sortBy(r => r.headers.get("timestamp").map(PROCESSING) ).reverse

Edit: added reverse (assuming you want it descending). Inside the function that you pass as argument, you would also put the processing to timestamp.

How to sort a Scala Map by key or value (sortBy, sortWith , You have an unsorted Scala Map and want to sort the elements in the map by the key or value. Solution. Given a basic, immutable Map : scala>� This implementation differs from HashMap in that it maintains a doubly-linked list running through all of its entries. This linked list defines the iteration ordering, which is normally the order in which keys were inserted into the map (insertion-order). So the first line of your code would become: Map dataSet = new LinkedHashMap();

import org.apache.spark.sql.{Encoders, Encoder, Dataset}
import org.apache.spark.sql.functions.{col, desc}
import java.sql.Timestamp

case class Nested(key_1: String,key_2: String,timestamp: Timestamp,key_n: String)
case class Root(headers:Nested,body:String)

implicit val rootCodec: Encoder[Root] = Encoders.product[Root]

val avroDS:Dataset[Root] = spark.read
                                .format("com.databricks.spark.avro")
                                .load(pathToHDFS)
                                .as[Root]

val sortedDF: DataFrame = avroDS.orderBy(desc(col("timestamp")))

This code snippet would directly cast your Avro data to Dataset[Root]. You wont have to rely on importing sparksession.implicits and would eliminate the step of casting your timestamp field to TimestampType. Internally, Spark's Timestamp datatype is implemented using java.sql.Timestamp.

1. Secondary Sort: Introduction - Data Algorithms [Book], MapReduce/Hadoop and Spark do not sort values for a reducer. First, the map () function receives a key-value pair input, ( key1 , value1 ). The Spark API is built upon the basic abstraction concept of the RDD (resilient distributed data set) . Then if you want to add that dataset to your map, follow these steps: Either open an existing map, or create a new map by clicking “OK” and then “New Map.” Once on the map, click “Add.” Select “Dataset Pins.” Check the box of the dataset you created. Click “OK.”

How to do Total Order Sorting in Hadoop MapReduce, Being able to sort by all keys in a data set is a common need in the world phase of map reduce, and reducer data is output in the same order. A dataset is a structured collection of data generally associated with a unique body of work. A database is an organized collection of data stored as multiple datasets, that are generally stored and accessed electronically from a computer system that allows the data to be easily accessed, manipulated, and updated.

containers: Maps, Sets, and more, Data.Set - you care about uniqueness and possibly the order of elements. Data. Map - you need a mapping from unique keys to values and operations you� By default, the order of data points is also the order of the colors in the chart legend. For more information, see Formatting Series Colors on a Chart (Report Builder and SSRS). Map report item. You do not typically need to sort data for a map data region because the map groups data to display on map elements. Gauge data region.

insertWithKey f key value mp will insert the pair (key, value) into mp if key does not exist in the map. If the key does exist, the function will insert the pair (key,f key new_value old_value). Note that the key passed to f is the same key passed to insertWithKey.

Comments
  • Thanks a lot, this is exactly what I was looking for. How did you manage to find this syntax to access the timestamp key ? Is there a documentation I'm missing ?
  • @Gatsby, please see my expanded answer.
  • Your solution works perfectly thanks but in the case where I need spark distributed performances for a large dataset how can I proceed ?
  • In addition the type of ds is Dataset[Root], in order to get a Vector you need "to get rid of" spark dataset abstraction by doing a collect like this : val test : Vector[Root] = spark .read .format("com.databricks.spark.avro") .load("path") .as[Root] .collect .to[Vector]
  • Here is a git gist showing you your solution : gist.github.com/christopheblp/bb6f5de3e8cd910bff74e3aefdfef0fe
  • Thank you for sharing the gist. Glad to hear my solution works for you
  • " but in the case where I need spark distributed performances for a large dataset how can I proceed ? " >>> To be honest I'm not very familiar with Spark. Checked out the DataSet api, which I see is optimized for Spark's distributed computing. So you wouldn't have that specific optimization in a "normal" Scala Seq. I would suggest you use a Vector, which is optimized for large volumes. See [docs.scala-lang.org/overviews/collections/… &[reddit.com/r/scala/comments/1ynujn/…