How to load only the data of the last partition

spark select latest partition
get latest partition hive
spark read partitioned data
hive show partitions
sql show partitions
spark get partition location
spark get max value of column

I have some data partitioned this way:

/data/year=2016/month=9/version=0 /data/year=2016/month=10/version=0 /data/year=2016/month=10/version=1 /data/year=2016/month=10/version=2 /data/year=2016/month=10/version=3 /data/year=2016/month=11/version=0 /data/year=2016/month=11/version=1

When using this data, I'd like to load the last version only of each month.

A simple way to do this is to do load("/data/year=2016/month=11/version=3") instead of doing load("/data"). The drawback of this solution is the loss of partitioning information such as year and month, which means it would not be possible to apply operations based on the year or the month anymore.

Is it possible to ask Spark to load the last version only of each month? How would you go about this?

Well, Spark supports predicate push-down, so if you provide a filter following the load, it will only read in the data fulfilling the criteria in the filter. Like this:

spark.read.option("basePath", "/data").load("/data").filter('version === 3)

And you get to keep the partitioning information :)

Loading data into hive table with dynamic partitioning, How do I load data into a dynamic partitioned table in hive? For example, if you specify INDDN before INTO TABLE, the specified input data set is used to load the entire table. However, if you specify INDDN after INTO TABLE, in a PART clause, the specified input data set is used to load only the specified partition.

Just an addition to previous answers for reference

I have a below ORC format table in hive which is partitioned on year,month & date column.

hive (default)> show partitions test_dev_db.partition_date_table;
OK
year=2019/month=08/day=07
year=2019/month=08/day=08
year=2019/month=08/day=09

If I set below properties, I can read the latest partition data in spark sql as shown below:

spark.sql("SET spark.sql.orc.enabled=true");
spark.sql("SET spark.sql.hive.convertMetastoreOrc=true")
spark.sql("SET spark.sql.orc.filterPushdown=true")

spark.sql("""select * from test_dev_db.partition_date_table where year ='2019' and month='08' and day='07' """).explain(True)

we can see PartitionCount: 1 in plan and it's obvious that it has filtered the latest partition.

== Physical Plan ==
*(1) FileScan orc test_dev_db.partition_date_table[emp_id#212,emp_name#213,emp_salary#214,emp_date#215,year#216,month#217,day#218] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxxx/dev/hadoop/database/test_dev..., **PartitionCount: 1**, PartitionFilters: [isnotnull(year#216), isnotnull(month#217), isnotnull(day#218), (year#216 = 2019), (month#217 = 0..., PushedFilters: [], ReadSchema: struct<emp_id:int,emp_name:string,emp_salary:int,emp_date:date>

whereas same will not work if I use below query: even if we create dataframe using spark.read.format("orc").load(hdfs absolute path of table) and create a temporary view and run spark sql on that. It will still scan all the partitions available for that table until and unless we use specific filter condition on top of that.

spark.sql("""select * from test_dev_db.partition_date_table where year ='2019' and month='08' and day in (select max(day) from test_dev_db.partition_date_table)""").explain(True)

It still has scanned all the three partitions, here PartitionCount: 3

== Physical Plan ==
*(2) BroadcastHashJoin [day#282], [max(day)#291], LeftSemi, BuildRight
:- *(2) FileScan orc test_dev_db.partition_date_table[emp_id#276,emp_name#277,emp_salary#278,emp_date#279,year#280,month#281,day#282] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 3, PartitionFilters: [isnotnull(year#280), isnotnull(month#281), (year#280 = 2019), (month#281 = 08)], PushedFilters: [], ReadSchema: struct<emp_id:int,emp_name:string,emp_salary:int,emp_date:date>

To filter out the data based on the max partition using spark sql, we can use the below approach. we can use below technique for partition pruning to limits the number of files and partitions that Spark reads when querying the Hive ORC table data.

rdd=spark.sql("""show partitions test_dev_db.partition_date_table""").rdd.flatMap(lambda x:x)
newrdd=rdd.map(lambda x : x.replace("/","")).map(lambda x : x.replace("year=","")).map(lambda x : x.replace("month=","-")).map(lambda x : x.replace("day=","-")).map(lambda x : x.split('-'))
max_year=newrdd.map(lambda x : (x[0])).max() 
max_month=newrdd.map(lambda x : x[1]).max()
max_day=newrdd.map(lambda x : x[2]).max()

prepare your query to filter Hive partition table using these max values.

query = "select * from test_dev_db.partition_date_table where year ='{0}' and month='{1}' and day ='{2}'".format(max_year,max_month,max_day)

>>> spark.sql(query).show();
+------+--------+----------+----------+----+-----+---+
|emp_id|emp_name|emp_salary|  emp_date|year|month|day|
+------+--------+----------+----------+----+-----+---+
|     3|  Govind|    810000|2019-08-09|2019|   08| 09|
|     4|  Vikash|      5500|2019-08-09|2019|   08| 09|
+------+--------+----------+----------+----+-----+---+

spark.sql(query).explain(True)

If you see the plan of this query, you can see that it has scanned only one partition of given Hive table. here PartitionCount is 1

== Optimized Logical Plan ==
Filter (((((isnotnull(day#397) && isnotnull(month#396)) && isnotnull(year#395)) && (year#395 = 2019)) && (month#396 = 08)) && (day#397 = 09))
+- Relation[emp_id#391,emp_name#392,emp_salary#393,emp_date#394,year#395,month#396,day#397] orc

== Physical Plan ==
*(1) FileScan orc test_dev_db.partition_date_table[emp_id#391,emp_name#392,emp_salary#393,emp_date#394,year#395,month#396,day#397] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 1, PartitionFilters: [isnotnull(day#397), isnotnull(month#396), isnotnull(year#395), (year#395 = 2019), (month#396 = 0..., PushedFilters: [], ReadSchema: struct<emp_id:int,emp_name:string,emp_salary:int,emp_date:date>

Hive: how to show all partitions of a table?, . If the PURGE option is not specified, the data is moved to a trash folder for a defined duration. Loading new files only by using time partitioned folder or file name. In a data integration solution, incrementally (or delta) loading data after an initial full data load is a widely used scenario. The tutorials in this section show you different ways of loading data incrementally by using Azure Data Factory.

I think you have to use Spark's Window Function and then find and filter out the latest version.

import org.apache.spark.sql.functions.{col, first}
import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy("year","month").orderBy(col("version").desc)

spark.read.load("/data")
  .withColumn("maxVersion", first("version").over(windowSpec))
  .select("*")
  .filter(col("maxVersion") === col("version"))
  .drop("maxVersion")

Let me know if this works for you.

Here's a Scala general function:

/**
  * Given a DataFrame, use keys (e.g. last modified time), to show the most up to date record
  *
  * @param dF          DataFrame to be parsed
  * @param groupByKeys These are the columns you would like to groupBy and expect to be duplicated,
  *                    hence why you're trying to obtain records according to a latest value of keys.
  * @param keys        The sequence of keys used to rank the records in the table
  * @return            DataFrame with records that have rank 1, this means the most up to date version of those records
  */
def getLastUpdatedRecords(dF: DataFrame, groupByKeys: Seq[String], keys: Seq[String]): DataFrame = {
    val part = Window.partitionBy(groupByKeys.head, groupByKeys.tail: _*).orderBy(array(keys.head, keys.tail: _*).desc)
    val rowDF = dF.withColumn("rn", row_number().over(part))
    val res = rowDF.filter(col("rn")===1).drop("rn")
    res
  }

Managed vs. External Tables - Apache Hive, This is particularly daft, since the unique constraint is maintained through a local index, so it must include the partitioning key ID, which means there is only one  Regarding the first factor, you should keep in mind that partitioning can only occur in a single column and that your queries must include that column. If for example, you have created a partitioned table, in order to run a query over the “eliminated” data, the partition indicator must be included in the query.

How to load Data very fast using Partition Exchange, When the LOAD DATA statement operates on a partitioned table, it always Currently, the Impala LOAD DATA statement only imports files from HDFS, not from As indicated by the message at the end of the previous example, the data file  If you only have 20 GB of free space, you won’t be able to shrink the partition by more than 20 GB. If you need to free up space but don’t want to delete any files, you may want to temporarily copy them to an external hard drive, delete the originals, and copy the files back over to your data partition afterwards.

LOAD DATA Statement, Known uses Partition pruning by continuous value You have some sort of Partitioning the data into bins will allow your jobs to load only pertinent data. Partition For example, consider the “last access date” field for a user in StackOverflow. The LOAD DATA statement reads rows from a text file into a table at a very high speed. LOAD DATA is the complement of SELECT INTO OUTFILE. (See Section 13.2.10.1, “SELECT INTO Statement” .) To write data from a table to a file, use SELECT INTO OUTFILE. To read the file back into a table, use LOAD DATA. The syntax of the

MapReduce Design Patterns: Building Effective Algorithms and , Daily loads, with a range partition strategy by day, are common in data warehouse environments. The following example shows a partition exchange load for the  Partition switching moves entire partitions between tables almost instantly. It is extremely fast because it is a metadata-only operation that updates the location of the data, no data is physically moved. New data can be loaded to separate tables and then switched in, old data can be switched out to separate tables and then archived or purged

Comments
  • Thank you for your reply. Unfortunately you solution wouldn't work for my use case as the last version might differ from a month to another. What I can do is making sure the last version is the same for all months, but I am not fan of this solution.