Spark sql how to explode without losing null values

pyspark explode_outer
spark sql explode
spark explode documentation
java spark explode
spark explode vs explode_outer
explode outer pyspark
explode_outer vs explode
sql explode array

I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]

should become

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

This is my code

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}

The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null

becomes

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

instead of

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
2  | Lucy | null

How can I explode my arrays so that I don't lose the null rows?

I am using Spark 1.5.2 and Java 8

Spark 2.2+

You can use explode_outer function:

import org.apache.spark.sql.functions.explode_outer

df.withColumn("likes", explode_outer($"likes")).show

// +---+----+--------+
// | id|name|   likes|
// +---+----+--------+
// |  1|Luke|baseball|
// |  1|Luke|  soccer|
// |  2|Lucy|    null|
// +---+----+--------+

Spark <= 2.1

In Scala but Java equivalent should be almost identical (to import individual functions use import static).

import org.apache.spark.sql.functions.{array, col, explode, lit, when}

val df = Seq(
  (1, "Luke", Some(Array("baseball", "soccer"))),
  (2, "Lucy", None)
).toDF("id", "name", "likes")

df.withColumn("likes", explode(
  when(col("likes").isNotNull, col("likes"))
    // If null explode an array<string> with a single null
    .otherwise(array(lit(null).cast("string")))))

The idea here is basically to replace NULL with an array(NULL) of a desired type. For complex type (a.k.a structs) you have to provide full schema:

val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")

val st =  StructType(Seq(
  StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast(st)))))

or

dfStruct.withColumn("y", explode(
  when(col("y").isNotNull, col("y"))
    .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))

Note:

If array Column has been created with containsNull set to false you should change this first (tested with Spark 2.1):

df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))

Spark Explode of Empty Column returns Empty Row, EDIT1 : As per Chandan , I found this stack question Spark sql how to explode without losing null values and could understand the explode api  Spark sql how to explode without losing null values (2) . I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row.

You can use explode_outer() function.

Spark sql how to explode without losing null values, For the version Spark 2.2+. Here explode_outer function can be used: import org.​apache.spark.sql.functions.explode_outer df. Spark SQL not recognizing null values after split. I have used the solution proposed for Spark <=2.1 and indeed the null values appaear as literals in my data after the split: The issue is that after that I need to check if there are null values in that column and take an action in that case.

Following up on the accepted answer, when the array elements are a complex type it can be difficult to define it by hand (e.g with large structs).

To do it automatically I wrote the following helper method:

  def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
      val arrayFields = df.schema.fields
          .map(field => field.name -> field.dataType)
          .collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
          .toMap

      columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
      dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
        .otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))    
 }

Edit: it seems that spark 2.2 and newer have this built in.

How to explode arrays without losing null values i, I have a Dataframe that I am trying to flatten. As part of the process, How to explode arrays without losing null values in spark. Highlighted  I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. My dataframe has columns tradeid, tradedate, and schedule.Now Schedule is an array, hence I query the datafr

Spark explode array and map columns to rows, explode – spark explode array or map column to rows Spark SQL explode_outer(e: Column) function is used to create a row for each Since the Washington and Jefferson have null or empty values in array and map, the  Is there any elegant way to explode map column in Pyspark 2.2 without loosing null values? Explode_outer was introduced in Pyspark 2.3 The schema of the affected column is: |-- foo: map (nullable =

Re: SparkSQL DF.explode with Nulls, Subject, Re: SparkSQL DF.explode with Nulls. Date, Fri, 05 Jun 2015 RuntimeException: Failed to check null bit for primitive long > value. How to explode the fields of the Employee objects as individual fields, meaning when expanded each row should have firstname as one column and lastname as one column, so that any grouping or filtering or other operations can be performed on individual columns. Add comment · Share. Normal Text Quote Code Header 1 Header 2 Header 3 Header 4

Collection Functions · The Internals of Spark SQL, array_contains(column: Column, value: Any): Column. explode. explode(e: Column): import org.apache.spark.sql.functions.size val c = size('id) scala> Unlike explode, explode_outer generates null when the array or map is null or empty. import org.apache.spark.sql._ // Create a Row from values. Row(value1, value2, value3, ) // Create a Row from a Seq of values. Row.fromSeq(Seq(value1, value2, )) A value of a row can be accessed through both generic access by ordinal, which will incur boxing overhead for primitives, as well as native primitive access.

Comments
  • That looks great, thank you! I have a followup question: what if my column type is a StructType? I tried using cast(new StructType()), but I got data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type; I'm trying to make my method as generic as possible, so it fits all column types.
  • Also, to get the column type, I'm using DataFrame.dtypes(). Is there a better way of getting the column types?
  • a) You have to provide full schema with all fields. b) dtypes or schema.
  • coalesce instead of case-when is more concise and should work like a charm
  • @hamed It will be virtually identical correcting for small syntax difference (like isNotNull() instead of isNotNull).
  • How is this def explodeOuter to be used in code?
  • You have to pass the dataframe that you want to explode, and the colum. explodeOuter(df, List("array_column"))