DataFrame-ified zipWithIndex

Related searches

I am trying to solve the age-old problem of adding a sequence number to a data set. I am working with DataFrames, and there appears to be no DataFrame equivalent to RDD.zipWithIndex. On the other hand, the following works more or less the way I want it to:

val origDF = sqlContext.load(...)    

val seqDF= sqlContext.createDataFrame(
    origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
    StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)

In my actual application, origDF won't be loaded directly out of a file -- it is going to be created by joining 2-3 other DataFrames together and will contain upwards of 100 million rows.

Is there a better way to do this? What can I do to optimize it?

Since Spark 1.6 there is a function called monotonically_increasing_id() It generates a new column with unique 64-bit monotonic index for each row But it isn't consequential, each partition starts a new range, so we must calculate each partition offset before using it. Trying to provide an "rdd-free" solution, I ended up with some collect(), but it only collects offsets, one value per partition, so it will not cause OOM

def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = {
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())

    val partitionOffsets = dfWithPartitionId
        .groupBy("partition_id")
        .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
        .orderBy("partition_id")
        .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" )
        .collect()
        .map(_.getLong(0))
        .toArray
        
     dfWithPartitionId
        .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id")))
        .withColumn(indexName, col("partition_offset") + col("inc_id"))
        .drop("partition_id", "partition_offset", "inc_id")
}

This solution doesn't repack the original rows and doesn't repartition the original huge dataframe, so it is quite fast in real world: 200GB of CSV data (43 million rows with 150 columns) read, indexed and packed to parquet in 2 minutes on 240 cores After testing my solution, I have run Kirk Broadhurst's solution and it was 20 seconds slower You may want or not want to use dfWithPartitionId.cache(), depends on task

DataFrame-ified zipWithIndex, Since Spark 1.6, a function called monotonically_increasing_id() was added. It creates a new column with unique 64-bit monotonic index for� DataFrame-ified zipWithIndex. 0 votes . 1 view. asked Jul 9, 2019 in Big Data Hadoop & Spark by Aarav (11.5k points) I am trying to solve the age-old problem of

The following was posted on behalf of the David Griffin (edited out of question).

The all-singing, all-dancing dfZipWithIndex method. You can set the starting offset (which defaults to 1), the index column name (defaults to "id"), and place the column in the front or the back:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row


def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String = "id",
  inFront: Boolean = true
) : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln =>
      Row.fromSeq(
        (if (inFront) Seq(ln._2 + offset) else Seq())
          ++ ln._1.toSeq ++
        (if (inFront) Seq() else Seq(ln._2 + offset))
      )
    ),
    StructType(
      (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]()) 
        ++ df.schema.fields ++ 
      (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
    )
  ) 
}

DataFrame-ified zipWithIndex - apache-spark - html, I am working with DataFrames, and there appears to be no DataFrame equivalent to RDD.zipWithIndex. On the other hand, the following works more or less the� DataFrame-ified zipWithIndex Estoy tratando de resolver el viejo problema de agregar un número de secuencia a un conjunto de datos. Estoy trabajando con DataFrames, y parece que no hay un DataFrame equivalente a RDD.zipWithIndex .

Starting in Spark 1.5, Window expressions were added to Spark. Instead of having to convert the DataFrame to an RDD, you can now use org.apache.spark.sql.expressions.row_number. Note that I found performance for the the above dfZipWithIndex to be significantly faster than the below algorithm. But I am posting it because:

  1. Someone else is going to be tempted to try this
  2. Maybe someone can optimize the expressions below

At any rate, here's what works for me:

import org.apache.spark.sql.expressions._

df.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1))))

Note that I use lit(1) for both the partitioning and the ordering -- this makes everything be in the same partition, and seems to preserve the original ordering of the DataFrame, but I suppose it is what slows it way down.

I tested it on a 4-column DataFrame with 7,000,000 rows and the speed difference is significant between this and the above dfZipWithIndex (like I said, the RDD functions is much, much faster).

[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex, From, "Ruslan Dautkhanov (JIRA)" <j@apache.org>. Subject, [jira] [Commented ] (SPARK-23074) Dataframe-ified zipwithindex. Date, Tue, 01� apache spark - DataFrame-ified zipWithIndex . I am trying to solve the age-old problem of adding a sequence number to a data set. I am working with DataFrames, and there appears to be no DataFrame equivalent to RDD.zipWithIndex. On the other hand…

PySpark version:

from pyspark.sql.types import LongType, StructField, StructType

def dfZipWithIndex (df, offset=1, colName="rowId"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''

    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))

    return spark.createDataFrame(new_rdd, new_schema)

Also created a jira to add this functionality in Spark natively: https://issues.apache.org/jira/browse/SPARK-23074

[#SPARK-23074] Dataframe-ified zipwithindex, Dataframe-ified zipwithindex. Status: Assignee: Priority: Resolution: Resolved. Unassigned. Minor. Incomplete. More� There's no way to do this through a Spark SQL query, really. But there's an RDD function called zipWithIndex. You can convert the DataFrame to an RDD, do zipWithIndex, and convert the resulting RDD back to a DataFrame. See this community Wiki article for a full-blown solution.

@Evgeny , your solution is interesting. Notice that there is a bug when you have empty partitions (the array is missing these partition indexes, at least this is happening to me with spark 1.6), so I converted the array into a Map(partitionId -> offsets).

Additionnally, I took out the sources of monotonically_increasing_id to have "inc_id" starting from 0 in each partition.

Here is an updated version:

import org.apache.spark.sql.catalyst.expressions.LeafExpression
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.catalyst.expressions.Nondeterministic
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratedExpressionCode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenContext
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.expressions.Window

case class PartitionMonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {

  /**
   * From org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
   *
   * Record ID within each partition. By being transient, count's value is reset to 0 every time
   * we serialize and deserialize and initialize it.
   */
  @transient private[this] var count: Long = _

  override protected def initInternal(): Unit = {
    count = 1L // notice this starts at 1, not 0 as in org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
  }

  override def nullable: Boolean = false

  override def dataType: DataType = LongType

  override protected def evalInternal(input: InternalRow): Long = {
    val currentCount = count
    count += 1
    currentCount
  }

  override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
    val countTerm = ctx.freshName("count")
    ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 1L;")
    ev.isNull = "false"
    s"""
      final ${ctx.javaType(dataType)} ${ev.value} = $countTerm;
      $countTerm++;
    """
  }
}

object DataframeUtils {
  def zipWithIndex(df: DataFrame, offset: Long = 0, indexName: String = "index") = {
    // from https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex)
    val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", new Column(PartitionMonotonicallyIncreasingID()))

    // collect each partition size, create the offset pages
    val partitionOffsets: Map[Int, Long] = dfWithPartitionId
      .groupBy("partition_id")
      .agg(max("inc_id") as "cnt") // in each partition, count(inc_id) is equal to max(inc_id) (I don't know which one would be faster)
      .select(col("partition_id"), sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") + lit(offset) as "cnt")
      .collect()
      .map(r => (r.getInt(0) -> r.getLong(1)))
      .toMap

    def partition_offset(partitionId: Int): Long = partitionOffsets(partitionId)
    val partition_offset_udf = udf(partition_offset _)
    // and re-number the index
    dfWithPartitionId
      .withColumn("partition_offset", partition_offset_udf(col("partition_id")))
      .withColumn(indexName, col("partition_offset") + col("inc_id"))
      .drop("partition_id")
      .drop("partition_offset")
      .drop("inc_id")
  }
}

DataFrame-ified zipWithIndex, Je travaille avec des DataFrames, et il ne semble pas y avoir de DataFrame �quivalent � RDD.zipWithIndex . D'autre part, les ouvrages suivants, plus ou moins� As far as I know, the only way to do want you want with DataFrames is by adding an index column using RDD.zipWithIndex to each and then doing a join on the index column. Code for doing zipWithIndex on a DataFrame can be found in this SO answer.

La pregunta está más o menos en el título: ¿Hay una manera eficiente de contar los valores distintos en cada columna en un DataFrame? El método de descripción proporciona solo el recuento pero no el recuento distintivo, y me pregunto si existe alguna forma de obtener el recuento distinto para todas (o algunas seleccionadas) columnas.

DataFrame-ified zipWithIndex; Spark spark-submit –jars argumentos quer lista de vírgulas, como declarar um diretório de flasks? Como definir o esquema para o tipo personalizado no Spark SQL? Como configurar o Spark no Windows? val partitioner = new HashPartitioner(idealPartionionNo) mas também usado com:

Comments
  • First of all, there can be an error, if some partition gets no rows of df. Please, check answer by @fylb with explanation of how to fix it. Second, the last as "cnt" is quite confusing there and, since it is not used after, can and worth to be removed. Third, thanks for very useful, interesting and quite elegant answer!
  • Let's say I'm creating indexes for column "product_id" with monotonically_increasing_id(), my question is does this function guarantee that the same product_id will have the same index across partitions/nodes?
  • @SarahData no, monotonically_increasing_id() is absolutely unique across all the rows, it doesn't depend on any other columns, so different rows with the same product_id will always have different monotonically_increasing_id()'s
  • @eliasah -- I found a Window expression way to do this. It is much slower, however, but figured you might want to take a look. See answer below.
  • That's awesome. Any references to a PySpark version? Thanks for sharing.
  • This looks very smooth. Can someone help me to write this in JAVA
  • Wouldn't that cause OOM error if the dataset doesn't fit in a single worker's memory?
  • I have no idea -- I just know that it is much, much slower than the RDD based zipWithIndex, and that was more than enough for me to stop thinking about it. I posted the above so that other people wouldn't be tempted to go too far down this path; the original dfZipWithIndex still seems to be the best approach.
  • Thanks for sharing this, I thought the way of not converting DF to RDD will be faster at first, and won't go too far on that way now.
  • two concerns with this approach: 1) window functions need order by clause - so data set will be sorted - which is why I guess you found that dfZipWithIndex is faster; 2) sometimes you don't have a column to order-by on; for example, you would need a native order of lines as it was in a file, so then dfZipWithIndex might be the only choice..
  • with spark 2.3.0, does the worker not spill to disk if the data exceeds it's memory capacity?