Primary keys with Apache Spark

spark sql "primary key"
foreign key in spark
spark sql create table primary key
how to add a constant column in a spark dataframe?
generate primary keys in spark
spark dataframe key
with clause in spark sql
zipwithindex pyspark dataframe

I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data into my database. When I use append mode I need to specify id for each DataFrame.Row. Is there any way for Spark to create primary keys?

Scala:

If all you need is unique numbers you can use zipWithUniqueId and recreate DataFrame. First some imports and dummy data:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Extract schema for further usage:

val schema = df.schema

Add id field:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Create DataFrame:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

The same thing in Python:

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

If you prefer consecutive number your can replace zipWithUniqueId with zipWithIndex but it is a little bit more expensive.

Directly with DataFrame API:

(universal Scala, Python, Java, R with pretty much the same syntax)

Previously I've missed monotonicallyIncreasingId function which should work just fine as long as you don't require consecutive numbers:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

While useful monotonicallyIncreasingId is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.

Note:

It is also possible to use rowNumber window function:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Unfortunately:

WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.

Primary keys in apache Spark, Primary keys in apache Spark. 0 votes. I have successfully established a JDBC connection with my spark and PostgreSQL. Am trying to insert� Primary keys in Apache Spark . 0 votes. I have established a JDBC connection with Apache Spark and PostgreSQL. Now, I want to insert data into my database. If I use

from pyspark.sql.functions import monotonically_increasing_id

df.withColumn("id", monotonically_increasing_id()).show()

Note that the 2nd argument of df.withColumn is monotonically_increasing_id() not monotonically_increasing_id .

Primary keys with Apache Spark, Scala: If all you need is unique numbers you can use zipWithUniqueId and recreate DataFrame. First some imports and dummy data: Primary keys in Apache Spark . Primary keys in Apache Spark +1 vote. I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data

I found the following solution to be relatively straightforward for the case where zipWithIndex() is the desired behavior, i.e. for those desirng consecutive integers.

In this case, we're using pyspark and relying on dictionary comprehension to map the original row object to a new dictionary which fits a new schema including the unique index.

# read the initial dataframe without index
dfNoIndex = sqlContext.read.parquet(dataframePath)
# Need to zip together with a unique integer

# First create a new schema with uuid field appended
newSchema = StructType([StructField("uuid", IntegerType(), False)]
                       + dfNoIndex.schema.fields)
# zip with the index, map it to a dictionary which includes new field
df = dfNoIndex.rdd.zipWithIndex()\
                      .map(lambda (row, id): {k:v
                                              for k, v
                                              in row.asDict().items() + [("uuid", id)]})\
                      .toDF(newSchema)

Informational Referential Integrity Constraints Support in Apache , An informational, or statistical, constraint is a constraint such as a unique, primary key, foreign key, or check constraint that can be used by Apache Spark to� tags: not related to Apache httpd | link. 2,378 4 4 gold badges 25 25 silver badges 34 34 bronze badges. database apache postgresql hadoop apache-spark. 1: source

Is there a primary key and foreign key concept in Apache Hive and , As of now there is no concept of Primary key and Foreign key in Hive. Hive is Is there a primary key and foreign key concept in Apache Hive and Spark SQL? As of now there is no concept of Primary key and Foreign key in Hive. Hive is not a replacement of RDBMS to do transactions but used mainly for analytics purpose. As it is not a relational database so there is no point of creating relations betwee

Spark SQL and DataFrames - Spark 2.2.1 , Starting Point: SparkSession; Creating DataFrames; Untyped Dataset Operations toDF("value", "square") squaresDF.write.parquet("data/test_table/key=1") // Create Instead, DataFrame remains the primary programing abstraction, which is� Primary Keys The underlying row key design is the single most important factor in Phoenix performance, and it’s important to get it right at design time because you cannot change it later without re-writing the data and index tables. The Phoenix primary keys are concatenated to create the underlying row key in Apache HBase.

Spark SQL and DataFrames - Spark 1.5.2 , dialect option. This parameter can be changed using either the setConf method on a SQLContext or by using a SET key=value command in SQL� An informational, or statistical, constraint is a constraint such as a unique, primary key, foreign key, or check constraint that can be used by Apache Spark to improve query performance. Informational constraints are not enforced by the Spark SQL engine; rather, they are used by Catalyst to optimize the query processing. Informational constraints will be […]

Comments
  • Do you have any special requirements? Data type, consecutive values, something else?
  • nope, just old good unique integers
  • will this only work with R? i know you used scala above, but all i can find about this zipWithUniqueId is only in SparkR docs
  • It is actually Scala. Do you need Python solution? Plain SQL?
  • no no, i can understand your code, I was just asking if there is anything in pyspark docs about zipWithUniqueId, but it seems like I was just lazy, because eventually I found it, thanks a lot for your solution!
  • Sure. I've added Python code as well and a short note about window functions.
  • This was an incredibly useful answer @zero323 thanks a lot!
  • If I am not mistaken, this requires SQL type bigint.