Using monotonically_increasing_id() for assigning row number to pyspark dataframe
I am using monotonically_increasing_id() to assign row number to pyspark dataframe using syntax below:
df1 = df1.withColumn("idx", monotonically_increasing_id())
Now df1 has 26,572,528 records. So I was expecting idx value from 0-26,572,527.
But when I select max(idx), its value is strangely huge: 335,008,054,165.
What's going on with this function? is it reliable to use this function for merging with another dataset having similar number of records?
I have some 300 dataframes which I want to combine into a single dataframe. So one dataframe contains IDs and others contain different records corresponding to them row-wise
From the documentation
A column that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.
Thus, it is not like an auto-increment id in RDBs and it is not reliable for merging.
If you need an auto-increment behavior like in RDBs and your data is sortable, then you can use
df.createOrReplaceTempView('df') spark.sql('select row_number() over (order by "some_column") as num, * from df') +---+-----------+ |num|some_column| +---+-----------+ | 1| ....... | | 2| ....... | | 3| ..........| +---+-----------+
If your data is not sortable and you don't mind using rdds to create the indexes and then fall back to dataframes, you can use
An example can be found here
# since you have a dataframe, use the rdd interface to create indexes with zipWithIndex() df = df.rdd.zipWithIndex() # return back to dataframe df = df.toDF() df.show() # your data | indexes +---------------------+---+ | _1 | _2| +-----------=---------+---+ |[data col1,data col2]| 0| |[data col1,data col2]| 1| |[data col1,data col2]| 2| +---------------------+---+
You will probably need some more transformations after that to get your dataframe to what you need it to be. Note: not a very performant solution.
Hope this helps. Good luck!
Come to think about it, you can combine the
monotonically_increasing_id to use the
# create a monotonically increasing id df = df.withColumn("idx", monotonically_increasing_id()) # then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number` df.createOrReplaceTempView('df') new_df = spark.sql('select row_number() over (order by "idx") as num, * from df')
Not sure about performance though.
Adding sequential IDs to a Spark Dataframe, Adding sequential unique IDs to a Spark Dataframe is not very straight-forward, You can do this using either zipWithIndex() or row_number() you can use the row_number() function to provide, well, row numbers: Another option, is to combine row_number() with monotonically_increasing_id() , which� Recently I was exploring ways of adding a unique row ID column to a dataframe. The requirement is simple: “the row ID should strictly increase with difference of one and the data order is not modified”. One way to do this is by simply leveraging monotonically_increasing_id function. In accordance with its name, this function creates a sequence of number that strictly increases (delta f(x) > 0).
using api functions you can do simply as the following
from pyspark.sql.window import Window as W from pyspark.sql import functions as F df1 = df1.withColumn("idx", F.monotonically_increasing_id()) windowSpec = W.orderBy("idx") df1.withColumn("idx", F.row_number().over(windowSpec)).show()
I hope the answer is helpful
Spark Dataframe – monotonically_increasing_id – SQL & Hadoop, Spark dataframe add row number is very common requirement especially if you use monotonically_increasing_id method to generate incremental numbers. Spark Dataframe – monotonically_increasing_id Spark dataframe add row number is very common requirement especially if you are working on ELT in Spark. You can use monotonically_increasing_id method to generate incremental numbers. However the numbers won’t be consecutive if the dataframe has more than 1 partition.
I found the solution by @mkaran useful, But for me there was no ordering column while using window function. I wanted to maintain the order of rows of dataframe as their indexes (what you would see in a pandas dataframe). Hence the solution in edit section came of use. Since it is a good solution (if performance is not a concern), I would like to share it as a separate answer.
# Add a increasing data column df_index = df.withColumn("idx", monotonically_increasing_id()) # Create the window specification w = Window.orderBy("idx") # Use row number with the window specification df_index = df_index.withColumn("index", F.row_number().over(w)) # Drop the created increasing data column df2_index = df2_index.drop("idx")
df is your original dataframe and
df_index is new dataframe.
pyspark.sql module — PySpark 2.1.0 documentation, The entry point to programming Spark with the Dataset and DataFrame API. creates a new SparkSession and assigns the newly created SparkSession as the global default. createDataFrame(l).collect() [Row(_1=u'Alice', _2=1)] >>> spark. The number of distinct values for each column should be less than 1e4. At most � I need to add a column of row IDs to a DataFrame. I used the DataFrame method monotonically_increasing_id() and It does give me an additional col of uniques row IDs (that are NOT consecutive by the way, but are unique). The problem I'm having is that when I filter the DataFrame the row Ids in the resulting DataFrame are re-assigned.
To merge dataframes of same size, use
zip on rdds
from pyspark.sql.types import StructType spark = SparkSession.builder().master("local").getOrCreate() df1 = spark.sparkContext.parallelize([(1, "a"),(2, "b"),(3, "c")]).toDF(["id", "name"]) df2 = spark.sparkContext.parallelize([(7, "x"),(8, "y"),(9, "z")]).toDF(["age", "address"]) schema = StructType(df1.schema.fields + df2.schema.fields) df1df2 = df1.rdd.zip(df2.rdd).map(lambda x: x+x) spark.createDataFrame(df1df2, schema).show()
But note the following from help of the method,
Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).
Monotonically increasing id function in Apache Spark SQL on , Home Apache Spark SQL Monotonically increasing id function in in Apache Spark GraphX post - each vertex must have assigned a To compute these increasing values, the implementation uses 2 numbers: the first 31 bits of the partition id and the row number within currentTimeMillis() sparkSession. Creating a row number of each row in PySpark DataFrame using row_number() function with Spark version 2.2 I think there will be no repartitioning of the data by
How do I add an persistent column of row ids to Spark DataFrame?, I used the DataFrame method monotonically_increasing_id() and It does give me an I filter the DataFrame the row Ids in the resulting DataFrame are re-assigned . The index is the new column name you had to add for the row numbers. I have a csv file; which i convert to DataFrame(df) in pyspark; after some transformation; I want to add a column in df; which should be simple row id (starting from 0 or 1 to N). I converted df in rdd and use "zipwithindex". I converted resulting rdd back to df. this approach works but it generated 250k tasks and takes a lot of time in execution.
Spark Dataframe :How to add a index Column : Aka Distributed Data , spark generate unique id I want to add a column from 1 to row's number. from pyspark.sql.functions import monotonically_increasing_id df_index You can do this using either zipWithIndex () or row_number () (depending on the amount� monotonically_increasing_id only guarantees that the ids will be unique and increasing, not that they will be consecutive. Hence using it on two different dataframes will likely create two very different columns, and the join will mostly return empty.
Grouping list items with zipWithIndex, Spark Dataframe – monotonically_increasing_id – SQL , You can use I am using monotonically_increasing_id() to assign row number to pyspark dataframe � In order to get the number of rows and number of column in pyspark we will be using functions like count() function and length() function. Dimension of the dataframe in pyspark is calculated by extracting the number of rows and number columns of the dataframe.