Spark - repartition() vs coalesce()

spark coalesce(1) vs repartition(1)
spark coalesce column
spark partitioning best practices
coalesce spark documentation
spark repartition write
coalesce in scala
spark coalesce by size
repartition in spark dataset

According to Learning Spark

Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions.

One difference I get is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased.

If the partitions are spread across multiple machines and coalesce() is run, how can it avoid data movement?

It avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions, only moving the data off the extra nodes, onto the nodes that we kept.

So, it would go something like this:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

Then coalesce down to 2 partitions:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

Notice that Node 1 and Node 3 did not require its original data to move.

Managing Spark Partitions with Coalesce and Repartition, Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement  One important point to note is, Spark repartition() and coalesce() are very expensive operations as they shuffle the data across many partitions hence try to minimize repartition as much as possible. 1.

Justin's answer is awesome and this response goes into more depth.

The repartition algorithm does a full shuffle and creates new partitions with data that's distributed evenly. Let's create a DataFrame with the numbers from 1 to 12.

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf contains 4 partitions on my machine.

numbersDf.rdd.partitions.size // => 4

Here is how the data is divided on the partitions:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

Let's do a full-shuffle with the repartition method and get this data on two nodes.

val numbersDfR = numbersDf.repartition(2)

Here is how the numbersDfR data is partitioned on my machine:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

The repartition method makes new partitions and evenly distributes the data in the new partitions (the data distribution is more even for larger data sets).

Difference between coalesce and repartition

coalesce uses existing partitions to minimize the amount of data that's shuffled. repartition creates new partitions and does a full shuffle. coalesce results in partitions with different amounts of data (sometimes partitions that have much different sizes) and repartition results in roughly equal sized partitions.

Is coalesce or repartition faster?

coalesce may run faster than repartition, but unequal sized partitions are generally slower to work with than equal sized partitions. You'll usually need to repartition datasets after filtering a large data set. I've found repartition to be faster overall because Spark is built to work with equal sized partitions.

Read this blog post if you'd like even more details.

Apache Spark: Repartition vs Coalesce – ashwin.cloud, One difference I know is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions  repartition() vs coalesce() in Spark. Spark splits data into partitions and computation is done in parallel for each partition. It is very important to understand how data is partitioned and when you need to manually modify the partitioning to run spark application efficiently.

One additional point to note here is that, as the basic principle of Spark RDD is immutability. The repartition or coalesce will create new RDD. The base RDD will continue to have existence with its original number of partitions. In case the use case demands to persist RDD in cache, then the same has to be done for the newly created RDD.

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

Spark Repartition() vs Coalesce(), One difference I know is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions  Repartition vs Coalesce - When to use what? 6. Conclusion; 1. What is Partitioning in Spark ? A Partition in simple terms is a split in the input data, so partitions in spark are basically smaller logical chunks or divisions of the input data. Spark distributes this partitioned data among the different nodes to perform distributed processing on the data.

repartition - its recommended to use repartition while increasing no of partitions, because it involve shuffling of all the data.

coalesce- it’s is recommended to use coalesce while reducing no of partitions. For example if you have 3 partitions and you want to reduce it to 2 partitions, Coalesce will move 3rd partition Data to partition 1 and 2. Partition 1 and 2 will remains in same Container.but repartition will shuffle data in all partitions so network usage between executor will be high and it impacts the performance.

Performance wise coalesce performance better than repartition while reducing no of partitions.

repartition() vs coalesce() in Spark, The repartition method can be used to either increase or decrease the number of partitions in a DataFrame. Repartition is a full Shuffle operation,  SPARK: Coalesce VS Repartition Spark splits data into partitions and executes computations on the partitions in parallel. You should understand how data is partitioned and when you need to manually adjust the partitioning to keep your Spark computations running efficiently.

All the answers are adding some great knowledge into this very often asked question.

So going by tradition of this question's timeline, here are my 2 cents.

I found the repartition to be faster than coalesce, in very specific case.

In my application when the number of files that we estimate is lower than the certain threshold, repartition works faster.

Here is what I mean

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

In above snippet, if my files were less than 20, coalesce was taking forever to finish while repartition was much faster and so the above code.

Of course, this number (20) will depend on the number of workers and amount of data.

Hope that helps.

Spark, However its always a question for developers when to use Repartition and Coalesce over Spark RDD, DataFrame and DataSet. Repartition can be used for increasing or decreasing the number of partitions. Whereas Coalesce can only be used for decreasing the number of partitions. Coalesce is a less expensive operation than Repartition as Coalesce reduces data movement between the nodes while Repartition shuffles all data over the network.

Apache Spark: Repartitioning v/s Coalesce, Repartition vs Coalesce - When to use what? 6. Conclusion We can use the rdd.glom() method to display the partitions in a list. glom - returns  Apache Spark: Repartitioning v/s Coalesce Does partitioning help you increase/decrease the Job Performance? Spark splits data into partitions and computation is done in parallel for each partition.

Repartition vs Coalesce in Apache Spark, Apache #Execution #Model #SparkUI #BigData #Spark #Partitions #Shuffle #​Stage #Internals Duration: 6:22 Posted: Jan 1, 2019 Spark - repartition() vs coalesce() 0 votes One difference I know is that with repartition() the number of partitions can be increased/decreased, but with coalesce() the number of partitions can only be decreased.

Spark Repartition & Coalesce - Explained, Spark - repartition() vs coalesce() Spark splits data into partitions and computation is done in parallel for each partition. It is very important to understand how data is partitioned and when you need to manually modify the partitioning to run spark application efficiently.

Comments
  • Thanks for the response. The documentation should have better said minimize data movement instead of avoiding data movement.
  • Is there any case when repartition should be use instead of coalesce?
  • @Niemand I think the current documentation covers this pretty well: github.com/apache/spark/blob/… Keep in mind that all repartition does is call coalesce with the shuffle parameter set to true. Let me know if that helps.
  • Is it possible to reduce number of partition files that are existing ? I have no hdfs, but problem with many files.
  • repartition will be statistically slower since it doesn't know that it is shrinking...although maybe they could optimize that. Internally it just calls coalesce with a shuffle = true flag
  • Great answer @Powers, but isn't the data in Partition A and B skewed? How is it evenly distributed?
  • Also, what's the best way to get the partition size without getting OOM error. I use rdd.glom().map(len).collect() but it gives lot of OOM errors.
  • @anwartheravian - Partition A and Partition B are different sizes because the repartition algorithm doesn't distribute data as equally for very small data sets. I used repartition to organize 5 million records into 13 partitions and each file was between 89.3 MB and 89.6 MB - that's pretty equal!
  • What about the other question in my second comment? :)
  • @Powers this look better answer with detail.
  • nice one! this is critical and at least to this experienced scala dev, not obvious--ie, neither repartition nor coalesce attempt to modify the data, just how it is distributed across nodes
  • @Harikrishnan so if I understood the other answers properly then as per them in case of coalesce Spark uses existing partitions however as RDD is immutable can you describe how Coalesce make use of existing partitions? As per my understanding I thought Spark appends new partitions to the existing partitions in coalesce.
  • But if the "old" RDD is not used anymore as is known by the execution graph it will be cleared from memory if not persisted, won't it?
  • Useful Explanation.
  • There will be movement of data with Coalese as well.