Pyspark: Split multiple array columns into rows

pyspark split array into columns
pyspark split column into multiple columns
split one column into multiple columns in spark dataframe
spark explode array into columns
pyspark array to columns
spark explode array into rows
pyspark split column into 2
pyspark dataframe split column into two

I have a dataframe which has one row, and several columns. Some of the columns are single values, and others are lists. All list columns are the same length. I want to split each list column into a separate row, while keeping any non-list column as is.

Sample DF:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

What I want:

|  a|  b|  c |    d |
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |

If I only had one list column, this would be easy by just doing an explode:

df_exploded = df.withColumn('b', explode('b'))
# >>>
# +---+---+---------+---+
# |  a|  b|        c|  d|
# +---+---+---------+---+
# |  1|  1|[7, 8, 9]|foo|
# |  1|  2|[7, 8, 9]|foo|
# |  1|  3|[7, 8, 9]|foo|
# +---+---+---------+---+

However, if I try to also explode the c column, I end up with a dataframe with a length the square of what I want:

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>>
# +---+---+---+---+
# |  a|  b|  c|  d|
# +---+---+---+---+
# |  1|  1|  7|foo|
# |  1|  1|  8|foo|
# |  1|  1|  9|foo|
# |  1|  2|  7|foo|
# |  1|  2|  8|foo|
# |  1|  2|  9|foo|
# |  1|  3|  7|foo|
# |  1|  3|  8|foo|
# |  1|  3|  9|foo|
# +---+---+---+---+

What I want is - for each column, take the nth element of the array in that column and add that to a new row. I've tried mapping an explode accross all columns in the dataframe, but that doesn't seem to work either:

df_split = col: df.withColumn(col, explode(col))).toDF()

Spark >= 2.4

You can replace zip_ udf with arrays_zip function

from pyspark.sql.functions import arrays_zip, col

    .withColumn("tmp", arrays_zip("b", "c"))
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.b"), col("tmp.c"), "d"))

Spark < 2.4

With DataFrames and UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode

zip_ = udf(
  lambda x, y: list(zip(x, y)),
      # Adjust types to reflect data types
      StructField("first", IntegerType()),
      StructField("second", IntegerType())

    .withColumn("tmp", zip_("b", "c"))
    # UDF output cannot be directly passed to explode
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

With RDDs:

    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
    .toDF(["a", "b", "c", "d"]))

Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:

from functools import reduce
from pyspark.sql import DataFrame

# Length of array
n = 3

# For legacy Python you'll need a separate function
# in place of method accessor 
    ("a", col("b").getItem(i), col("c").getItem(i), "d")
        for i in range(n))
).toDF("a", "b", "c", "d")

or even:

from pyspark.sql.functions import array, struct

# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
    for i in range(n)

    .withColumn("tmp", tmp)
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:

# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
    return explode(array(*[
        struct(*[col(c).getItem(i).alias(c) for c in colnames])
        for i in range(n)

df.withColumn("tmp", zip_and_explode("b", "c", n=3))

Pyspark DataFrame: Split column with multiple values into rows , You can use explode but first you'll have to convert the string representation of the array into an array. One way is to use regexp_replace to  Pyspark: Split multiple array columns into rows - Wikitechy

You'd need to use flatMap, not map as you want to make multiple output rows out of each input row.

from pyspark.sql import Row
def dualExplode(r):
    rowDict = r.asDict()
    bList = rowDict.pop('b')
    cList = rowDict.pop('c')
    for b,c in zip(bList, cList):
        newDict = dict(rowDict)
        newDict['b'] = b
        newDict['c'] = c
        yield Row(**newDict)

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))

Pyspark: Split multiple array columns into rows, For the version of Spark >= 2.4. Here zip_ udf can be replaced with arrays_zip function from pyspark.sql.functions import arrays_zip (df . PySpark function explode (e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements. When a map is passed, it creates two new columns one for key and one for value and each element in map split into the rows.

One liner (for Spark>=2.4.0):
df.withColumn("bc", arrays_zip("b","c"))
  .select("a", explode("bc").alias("tbc"))
  .select("a", col"tbc.b", "tbc.c").show()
Import required:

from pyspark.sql.functions import arrays_zip

Steps -
  1. Create a column bc which is an array_zip of columns b and c
  2. Explode bc to get a struct tbc
  3. Select the required columns a, b and c (all exploded as required).
> df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
|  a|  b|  c|
|  1|  1|  7|
|  1|  2|  8|
|  1|  3|  9|

PySpark explode array and map columns to rows, When a map is passed, it creates two new columns one for key and one for value and each element in map split into the rows. This will ignore  Explode array data into rows in spark [duplicate] Pyspark DataFrame: Split column with multiple values into rows. 0. Spark deduplication of RDD to get bigger RDD. 0.

PySpark, Problem: How to explode & flatten nested array (Array of Array) DataFrame columns into rows using PySpark. Solution: PySpark explode  you can directly 'split' the column using *:'key', 'value.*').show() +---+------+------+------+ |key|value1|value2|value3| +---+------+------+------+ | a| 1| 2| 3| | b| 2| 3| 4| +---+------+------+------+. share. Share a link to this answer. Copy link.

Column Explode, import org.apache.spark.sql.functions.array val a =$"a", $"b", $"c")​.as("arr")) Split single column of sequence of values into multiple columns  Solution: PySpark explode function can be used to explode an Array of Array (nested Array) ArrayType (ArrayType (StringType)) columns to rows on PySpark DataFrame using python example. Before we start, let’s create a DataFrame with a nested array column.

pyspark.sql module, DataFrame A distributed collection of data grouped into named columns. data – an RDD of any kind of SQL data representation (e.g. row, tuple, int, boolean, etc​.) Returns a new SparkSession as new session, that has separate SQLConf, registered Can be a single column name, or a list of names for multiple columns. Pyspark: Split multiple array columns into rows I have a dataframe which has one row, and several columns. Some of the columns are single values, and others are lists.