Avoid specifying schema twice (Spark/scala)

spark sql create table example
spark create table from dataframe
spark bucketby example
spark sql partition
spark partitioning best practices
spark read hive table
spark apply schema to dataframe
spark read partitioned data

I need to iterate over data frame in specific order and apply some complex logic to calculate new column.

Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.

Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
  if (iter.hasNext) {
    def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

    val head = iter.next
    val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
    val r =
      new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

    iter.scanLeft(r)((r1, r2) =>
      new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
    )
  } else iter
}

val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show

What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?

What information is lost after applying mapPartitions so output cannot be processed without

The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.

schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.

How to avoid specifying it?

Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.

Avoid specifying schema twice (Spark/scala) - scala - html, avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output. import org.apache.spark.sql. Apply RowRDD in Row Data based on Schema. Use the following statement for creating a DataFrame using rowRDD data and schema (SCHEMA) variable. scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema) Output employeeDF: org.apache.spark.sql.DataFrame = [id: string, name: string, age: string] Store DataFrame Data into Table

OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.

You need something like:

case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)

import spark.implicits._

// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]

def f_row(it: Iterator[Before]): Iterator[After] = ???

beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show

Tips and Best Practices to Take Advantage of Spark 2.x, Columnar layout for memory data avoids unnecessary I/O and accelerates analytical processing Datasets, DataFrames, and Spark SQL provide the following advantages: See chapter 2 in the eBook for examples of specifying the schema on read. ReadSchema: struct<dst:string,depdelay:double>. Spark Schema defines the structure of the data, in other words, it is the structure of the DataFrame, Spark SQL provides StructType & StructField classes to programmatically specify the schema. By default, Spark infers the schema from data, however, some times we may need to define our own column names and data types especially while working with unstructured and semi-structured data and this article explains how to define simple, nested and complex schemas with examples.

I found below explanation sufficient, maybe it will be useful for others.

mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].

  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
    new Dataset[U](
      sparkSession,
      MapPartitions[T, U](func, logicalPlan),
      implicitly[Encoder[U]])
  }

On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.

I described alternatives in this answer: https://stackoverflow.com/a/53177628/7869491.

Spark SQL and DataFrames, Manually Specifying Options; Run SQL on files directly; Save Modes; Saving It is conceptually equivalent to a table in a relational database or a data frame getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer. caseSensitiveInferenceMode to NEVER_INFER to avoid the initial overhead  Inferred from Metadata: If the data source already has a built-in schema (such as the database schema of a JDBC data source, or the embedded metadata in a Parquet data source), Spark creates the DataFrame schema based upon the built-in schema. JavaBeans and Scala case classes representing rows of the data can also be used as a hint to generate

Spark SQL and DataFrames, Spark SQL, DataFrames and Datasets Guide Inferring the Schema Using Reflection; Programmatically Specifying the Schema Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders. LOCATION in order to prevent accidental dropping the existing data in the user-provided locations. Spark uses Java’s reflection API to figure out the fields and build the schema. There are several cases where you would not want to do it. One of them being case class’ limitation that it can only support 22 fields.

Schema Evolution & Enforcement on Delta Lake, With Delta Lake, the table's schema is saved in JSON format inside the transaction log. To avoid potential mistakes, data corruption or loss issues (​which we've count AS double) AS amount FROM loan_by_state_delta """) # Show original To view the plot, execute the following Spark SQL statement. With this PR, we can specify a user-provided custom schema when reading avro files. The custom schema can contain non-existing fields. 1. If the custom schema contains non-existing field and the field is nullable, then we will fill the value as null.

Prevent duplicated columns when joining two DataFrames , If you perform a join in Spark and don't specify your join correctly you'll end If you join on columns, you get duplicated columns. Scala. Scala. Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine. Spark SQL can also be used to read data from an existing Hive installation. For more on how to configure this feature, please refer to the Hive Tables section. DataFrames

Comments
  • Can you use Dataset and provide a function that maps from T => U?
  • @TerryDactyl, can you elaborate a bit more? I use df.repartition($"part").sortWithinPartitions($"id") which is org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] and I supply f_row(iter: Iterator[Row]) : Iterator[Row] for mapPartitions.
  • A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
  • See below......
  • I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.
  • And function used in mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
  • As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…