Pyspark: how to duplicate a row n time in dataframe?

pyspark show duplicate rows
pyspark find duplicate rows
duplicate dataframe pyspark
pyspark dataframe find duplicate rows
pyspark drop duplicates keep last
spark create duplicate rows
spark make duplicate rows
pyspark create copy of dataframe

I've got a dataframe like this and I want to duplicate the row n times if the column n is bigger than one:

A   B   n  
1   2   1  
2   9   1  
3   8   2    
4   1   1    
5   3   3 

And transform like this:

A   B   n  
1   2   1  
2   9   1  
3   8   2
3   8   2       
4   1   1    
5   3   3 
5   3   3 
5   3   3 

I think I should use explode, but I don't understand how it works... Thanks

The explode function returns a new row for each element in the given array or map.

One way to exploit this function is to use a udf to create a list of size n for each row. Then explode the resulting array.

from pyspark.sql.functions import udf, explode
from pyspark.sql.types import ArrayType, IntegerType

df = spark.createDataFrame([(1,2,1), (2,9,1), (3,8,2), (4,1,1), (5,3,3)] ,["A", "B", "n"]) 

# use udf function to transform the n value to n times
n_to_array = udf(lambda n : [n] * n, ArrayType(IntegerType()))
df2 = df.withColumn('n', n_to_array(df.n))

# now use explode  
df2.withColumn('n', explode(df2.n)).show()

+---+---+---+ 
| A | B | n | 
+---+---+---+ 
|  1|  2|  1| 
|  2|  9|  1| 
|  3|  8|  2| 
|  3|  8|  2| 
|  4|  1|  1| 
|  5|  3|  3| 
|  5|  3|  3| 
|  5|  3|  3| 
+---+---+---+ 

Pyspark: how to duplicate a row n time in dataframe?, The explode function returns a new row for each element in the given array or map. One way to exploit this function is to use a udf to create a  I've got a dataframe like this and I want to duplicate the row n times if the column n is bigger than one: A B n 1 2 1 2 9 1 3 8 2

I think the udf answer by @Ahmed is the best way to go, but here is an alternative method, that may be as good or better for small n:

First, collect the maximum value of n over the whole DataFrame:

max_n = df.select(f.max('n').alias('max_n')).first()['max_n']
print(max_n)
#3

Now create an array for each row of length max_n, containing numbers in range(max_n). The output of this intermediate step will result in a DataFrame like:

df.withColumn('n_array', f.array([f.lit(i) for i in range(max_n)])).show()
#+---+---+---+---------+
#|  A|  B|  n|  n_array|
#+---+---+---+---------+
#|  1|  2|  1|[0, 1, 2]|
#|  2|  9|  1|[0, 1, 2]|
#|  3|  8|  2|[0, 1, 2]|
#|  4|  1|  1|[0, 1, 2]|
#|  5|  3|  3|[0, 1, 2]|
#+---+---+---+---------+

Now we explode the n_array column, and filter to keep only the values in the array that are less than n. This will ensure that we have n copies of each row. Finally we drop the exploded column to get the end result:

df.withColumn('n_array', f.array([f.lit(i) for i in range(max_n)]))\
    .select('A', 'B', 'n', f.explode('n_array').alias('col'))\
    .where(f.col('col') < f.col('n'))\
    .drop('col')\
    .show()
#+---+---+---+
#|  A|  B|  n|
#+---+---+---+
#|  1|  2|  1|
#|  2|  9|  1|
#|  3|  8|  2|
#|  3|  8|  2|
#|  4|  1|  1|
#|  5|  3|  3|
#|  5|  3|  3|
#|  5|  3|  3|
#+---+---+---+

However, we are creating a max_n length array for each row- as opposed to just an n length array in the udf solution. It's not immediately clear to me how this will scale vs. udf for large max_n, but I suspect the udf will win out.

Pyspark: how to duplicate a row n time in dataframe?, pyspark explode pyspark check duplicate rows spark duplicate rows n times. I've got a dataframe like this and I want to duplicate the row n times if the column n  While N/A values can hurt our analysis, sometimes dropping these rows altogether is even more problematic. Consider the case where we want to gain insights to aggregated data: dropping entire rows will easily skew aggregate stats by removing records from the total pool and removing records which should have been counted.

With Spark 2.4.0+, this is easier with builtin functions: array_repeat + explode:

from pyspark.sql.functions import expr

df = spark.createDataFrame([(1,2,1), (2,9,1), (3,8,2), (4,1,1), (5,3,3)], ["A", "B", "n"])

new_df = df.withColumn('n', expr('explode(array_repeat(n,int(n)))'))

>>> new_df.show()
+---+---+---+
|  A|  B|  n|
+---+---+---+
|  1|  2|  1|
|  2|  9|  1|
|  3|  8|  2|
|  3|  8|  2|
|  4|  1|  1|
|  5|  3|  3|
|  5|  3|  3|
|  5|  3|  3|
+---+---+---+

Deduplicating and Collapsing Records in Spark DataFrames , This blog post explains how to filter duplicate records from Spark DataFrames with the dropDuplicates() and killDuplicates() methods. Duplicate rows could be remove or drop from Spark DataFrame using distinct() and dropDuplicates() functions, distinct() can be used to remove rows that have the same values on all columns whereas dropDuplicates() can be used to remove rows that have the same values on multiple selected columns.

Pyspark show duplicate rows, 5 alone; so, we thought it is a good time for revisiting the subject, this time Return a new DataFrame with duplicate rows removed n – Number of rows to show. In this article we will discuss ways to find and select duplicate rows in a Dataframe based on all or given column names only. In Python’s Pandas library, Dataframe class provides a member function to find duplicate rows based on all columns or some specific columns i.e. It returns a Boolean Series with True value for each duplicated row.

pyspark.sql module, The entry point to programming Spark with the Dataset and DataFrame API. Return a new DataFrame with duplicate rows removed, optionally only considering eventTime – the name of the column that contains the event time of the row. In this tutorial we will learn how to delete or drop the duplicate row of a dataframe in python pandas with example using drop_duplicates () function. lets learn how to. Now lets simply drop the duplicate rows in pandas as shown below. In the above example first occurrence of the duplicate row is kept and subsequent occurrence will be deleted

pyspark.sql module, The entry point to programming Spark with the Dataset and DataFrame API. The user-defined function can be either row-at-a-time or vectorized. Return a new DataFrame with duplicate rows removed, optionally only considering certain​  Retrieve top n in each group of a DataFrame in pyspark. asked Jul 10, 2019 in Big Data Hadoop & Spark by Aarav Pyspark: how to duplicate a row n time in dataframe?

Comments
  • I prefere @jxc's answer for spark 2.4+ because it uses all built-ins instead of a udf.