How to assign a unique Id to the dataset row based on some column value in Spark

spark generate unique id
spark dataframe row number
generate sequence number in spark dataframe
spark dataframe zipwithindex
spark generate uuid
spark dataframe get row by index
generate sequence number in pyspark dataframe
pyspark row_number

I am trying to assign a unique Id to the row of the dataset based on some column value. For eg consider that we have a dataset as follows:

State Country Person
MH    IN      ABC
AP    IN      XYZ
J&K   IN      XYZ
MH    IN      PQR 

Now i want to assign a unique Id based on the State Column Value,if the column value repeats furhter the same Id should be populated. Output should be as follows:

State Country Person Unique_ID
MH    IN      ABC     1
AP    IN      XYZ     2 
J&K   IN      XYZ     3
MH    IN      PQR     1

How to solve this problem using Spark Java Programming. Any help would be appreciated.

Here is one way of doing it with Java Spark.

package com.stackoverflow.works;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.WindowSpec; 
import static org.apache.spark.sql.functions.dense_rank;
import static org.apache.spark.sql.functions.desc;


public class UniqueIdJob {

    @SuppressWarnings("serial")
    public static class Record implements Serializable {

        private String state;

        private String country;

        private String person;

        public Record(String state, String country, String person) {
            super();
            this.state = state;
            this.country = country;
            this.person = person;
        }

        public String getState() {
            return state;
        }

        public void setState(String state) {
            this.state = state;
        }

        public String getCountry() {
            return country;
        }

        public void setCountry(String country) {
            this.country = country;
        }

        public String getPerson() {
            return person;
        }

        public void setPerson(String person) {
            this.person = person;
        }

    }

    private static Dataset<Record> createDataset(SparkSession spark) {
        List<Record> records = new ArrayList<Record>();
        records.add(new Record("MH", "IN", "ABC"));
        records.add(new Record("AP", "IN", "XYZ"));
        records.add(new Record("J&K", "IN", "XYZ"));
        records.add(new Record("MH", "IN", "PQR"));
        records.add(new Record("AP", "IN", "XYZ1"));
        records.add(new Record("AP", "IN", "XYZ2"));
        Encoder<Record> recordEncoder = Encoders.bean(Record.class);
        Dataset<Record> recordDataset = spark.createDataset(records,
                recordEncoder);

        return recordDataset;

    }

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder().appName("UniqueIdJob")
                .master("local[2]").getOrCreate();
        Dataset<Record> recordDataset = createDataset(spark);
        WindowSpec windowSpec = org.apache.spark.sql.expressions.Window.orderBy(desc("state"));
        Dataset<Row> rowDataset = recordDataset.withColumn("id", dense_rank().over(windowSpec));
        rowDataset.show();
        spark.stop();
    }

}

Adding sequential IDs to a Spark Dataframe, You can use monotonically_increasing_id . For example in scala: val df1 = spark.​createDataFrame(your_rdd, schema) import  The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This method needs to trigger a spark job when this RDD contains more than one partitions.

It is bit slower however you can do something like follows:

select state,country,person,dense_rank() over(order by state) from ds;

This should do the job. However, windowing function without partitioning can be slower

Generate Unique IDs for Each Rows in a Spark Dataframe, Adding sequential unique IDs to a Spark Dataframe is not very zipWithIndex() or row_number() (depending on the amount and kind field) in a descending order, will give you the most recent rows first etc. some working knowledge of Spark, and more specifically of PySpark. On RDDs and Datasets  To do this we have the row_number() window function in hive to assign unique numbers to the dataset across the nodes. But Spark 1.3 does not support window functions yet. As a workaround we can use the zipWithIndex RDD function which does the same as row_number() in hive.

You can define your own UDF (User Defined Function). Then you can write your own logic to create the unique id.

Here in the below example, I have created UDF to get unique id with the help of hashcode.

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_172)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val dataDF=Seq(("MH",  "IN","ABC"),("AP",  "IN","XYZ"),("J&K","IN","XYZ"),("MH",  "IN","PQR")).toDF("State","Country","Person")
dataDF: org.apache.spark.sql.DataFrame = [State: string, Country: string ... 1 more field]

scala> dataDF.createOrReplaceTempView("table1")

scala> def uniqueId(col:String)={col.hashCode}
uniqueId: (col: String)Int

scala> spark.udf.register("uniqueid",uniqueId _)
res1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType)))

scala> spark.sql("select state,country,person ,uniqueid(state) as unique_id from table1").show
+-----+-------+------+---------+
|state|country|person|unique_id|
+-----+-------+------+---------+
|   MH|     IN|   ABC|     2459|
|   AP|     IN|   XYZ|     2095|
|  J&K|     IN|   XYZ|    72367|
|   MH|     IN|   PQR|     2459|
+-----+-------+------+---------+

Column · The Internals of Spark SQL, Let's see how to create Unique IDs for each of the rows present in a collection Apply a spark dataframe method to generate Unique Ids With PartitionBy Column: val df = sc.parallelize(Seq(("Databricks", 20000), ("Spark",  Now i want to assign a unique Id based on the State Column Value,if the column value repeats furhter the same Id should be populated. Output should be as follows: State Country Person Unique_ID MH IN ABC 1 AP IN XYZ 2 J&K IN XYZ 3 MH IN PQR 1 How to solve this problem using Spark Java Programming.

How to append new column values in dataframe behalf of unique id's, Cost-Based Optimization (CBO) Note. A Column is a value generator for every row in a Dataset . Column = id scala> val idCol = dataset("id") idCol: org.​apache.spark.sql.Column = Creating Column Instance For Catalyst Expression — apply Factory Method Column = someColumn OVER (UnspecifiedFrame) import  A step-by-step Python code example that shows how to select rows from a Pandas DataFrame based on a column's values. Provided by Data Interview Questions, a mailing list for coding and data interview problems.

Dataset (Spark 2.1.0 JavaDoc), I need to create new column with data in dataframe. 1.8)) val rdd: RDD[(Long, Double)] = sparkContext.parallelize((tuples Now I want to add new column named Average and add value for all the rows behalf of ID How can i convert current_timestamp() to a string in scala, I have tried a few but no luck. In this blog, you will learn about filtering datasets, based on the column values in C#. In this blog, we will learn, how to filter a DataSet, based on the value of a particular column. Suppose, we have a DataSet ‘ds’ with the records of some users, given below: Now, we want to filter this DataSet, based on States.

Spark SQL, DataFrames and Datasets Guide, To create Dataset<Row> using SparkSession Dataset<Row> people (Scala-​specific) Returns a new Dataset with an alias set. Column · apply(String colName​). Selects column based on the column name and return it as a Column . toDF("​id", "name") // this creates a DataFrame with column name "id" and "name". Dataset operations can also be untyped, through various domain-specific-language (DSL) functions defined in: Dataset (this class), Column, and functions. These operations are very similar to the operations available in the data frame abstraction in R or Python. To select a column from the Dataset, use apply method in Scala and col in Java.

Comments
  • Just to be clear, are you using Java? The tags seems to suggest otherwise.