Spark sql how to explode without losing null values
spark sql explode
spark explode documentation
java spark explode
spark explode vs explode_outer
explode outer pyspark
explode_outer vs explode
sql explode array
I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,
id | name | likes _______________________________ 1 | Luke | [baseball, soccer]
should become
id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer
This is my code
private DataFrame explodeDataFrame(DataFrame df) { DataFrame resultDf = df; for (StructField field : df.schema().fields()) { if (field.dataType() instanceof ArrayType) { resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name()))); resultDf.show(); } } return resultDf; }
The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:
id | name | likes _______________________________ 1 | Luke | [baseball, soccer] 2 | Lucy | null
becomes
id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer
instead of
id | name | likes _______________________________ 1 | Luke | baseball 1 | Luke | soccer 2 | Lucy | null
How can I explode my arrays so that I don't lose the null rows?
I am using Spark 1.5.2 and Java 8
Spark 2.2+
You can use explode_outer
function:
import org.apache.spark.sql.functions.explode_outer df.withColumn("likes", explode_outer($"likes")).show // +---+----+--------+ // | id|name| likes| // +---+----+--------+ // | 1|Luke|baseball| // | 1|Luke| soccer| // | 2|Lucy| null| // +---+----+--------+
Spark <= 2.1
In Scala but Java equivalent should be almost identical (to import individual functions use import static
).
import org.apache.spark.sql.functions.{array, col, explode, lit, when} val df = Seq( (1, "Luke", Some(Array("baseball", "soccer"))), (2, "Lucy", None) ).toDF("id", "name", "likes") df.withColumn("likes", explode( when(col("likes").isNotNull, col("likes")) // If null explode an array<string> with a single null .otherwise(array(lit(null).cast("string")))))
The idea here is basically to replace NULL
with an array(NULL)
of a desired type. For complex type (a.k.a structs
) you have to provide full schema:
val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y") val st = StructType(Seq( StructField("_1", IntegerType, false), StructField("_2", StringType, true) )) dfStruct.withColumn("y", explode( when(col("y").isNotNull, col("y")) .otherwise(array(lit(null).cast(st)))))
or
dfStruct.withColumn("y", explode( when(col("y").isNotNull, col("y")) .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
Note:
If array Column
has been created with containsNull
set to false
you should change this first (tested with Spark 2.1):
df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
Spark Explode of Empty Column returns Empty Row, EDIT1 : As per Chandan , I found this stack question Spark sql how to explode without losing null values and could understand the explode api Spark sql how to explode without losing null values (2) . I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row.
You can use explode_outer()
function.
Spark sql how to explode without losing null values, For the version Spark 2.2+. Here explode_outer function can be used: import org.apache.spark.sql.functions.explode_outer df. Spark SQL not recognizing null values after split. I have used the solution proposed for Spark <=2.1 and indeed the null values appaear as literals in my data after the split: The issue is that after that I need to check if there are null values in that column and take an action in that case.
Following up on the accepted answer, when the array elements are a complex type it can be difficult to define it by hand (e.g with large structs).
To do it automatically I wrote the following helper method:
def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = { val arrayFields = df.schema.fields .map(field => field.name -> field.dataType) .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])} .toMap columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) => dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol)) .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType))))) }
Edit: it seems that spark 2.2 and newer have this built in.
How to explode arrays without losing null values i, I have a Dataframe that I am trying to flatten. As part of the process, How to explode arrays without losing null values in spark. Highlighted I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. My dataframe has columns tradeid, tradedate, and schedule.Now Schedule is an array, hence I query the datafr
Spark explode array and map columns to rows, explode – spark explode array or map column to rows Spark SQL explode_outer(e: Column) function is used to create a row for each Since the Washington and Jefferson have null or empty values in array and map, the Is there any elegant way to explode map column in Pyspark 2.2 without loosing null values? Explode_outer was introduced in Pyspark 2.3 The schema of the affected column is: |-- foo: map (nullable =
Re: SparkSQL DF.explode with Nulls, Subject, Re: SparkSQL DF.explode with Nulls. Date, Fri, 05 Jun 2015 RuntimeException: Failed to check null bit for primitive long > value. How to explode the fields of the Employee objects as individual fields, meaning when expanded each row should have firstname as one column and lastname as one column, so that any grouping or filtering or other operations can be performed on individual columns. Add comment · Share. Normal Text Quote Code Header 1 Header 2 Header 3 Header 4
Collection Functions · The Internals of Spark SQL, array_contains(column: Column, value: Any): Column. explode. explode(e: Column): import org.apache.spark.sql.functions.size val c = size('id) scala> Unlike explode, explode_outer generates null when the array or map is null or empty. import org.apache.spark.sql._ // Create a Row from values. Row(value1, value2, value3, ) // Create a Row from a Seq of values. Row.fromSeq(Seq(value1, value2, )) A value of a row can be accessed through both generic access by ordinal, which will incur boxing overhead for primitives, as well as native primitive access.
Comments
- That looks great, thank you! I have a followup question: what if my column type is a StructType? I tried using cast(new StructType()), but I got
data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;
I'm trying to make my method as generic as possible, so it fits all column types. - Also, to get the column type, I'm using DataFrame.dtypes(). Is there a better way of getting the column types?
- a) You have to provide full schema with all fields. b)
dtypes
orschema
. - coalesce instead of case-when is more concise and should work like a charm
- @hamed It will be virtually identical correcting for small syntax difference (like
isNotNull()
instead ofisNotNull
). - How is this
def explodeOuter
to be used in code? - You have to pass the dataframe that you want to explode, and the colum.
explodeOuter(df, List("array_column"))