Cross join between two large datasets in Spark

spark cross join
spark dataframe memory usage
spark dataset join
spark dataframe join multiple columns scala
spark join multiple data frames
spark best practices
spark dataset join example java
spark increase cache memory

I have 2 large datasets. First dataset contains about 130 million entries. The second dataset contains about 40000 entries. The data is fetched from MySQL tables.

I need to do a cross-join but I am getting

java.sql.SQLException: GC overhead limit exceeded

What is the best optimum technique to do this in Scala?

Following is a snippet of my code:

val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)

df1 is the larger dataset and df2 is the smaller one.

130M*40K = 52 trillion records is 52 terabytes of required memory to store this data, and this is if we assume that each record is 1 byte, which is most certainly not true. If it is as much as 64 bytes (which I think is also a very conservative estimate), you'd need 3.32 petabytes (!) of memory just to store the data. It is a very large amount, so unless you have a very large cluster and very fast network inside that cluster, you might want to rethink your algorithm to make it work.

That being said, when you do a join of two SQL datasets/dataframes, the number of partitions that Spark would use to store the result of the join is controlled by the spark.sql.shuffle.partitions property (see here). You might want to set it to a very large number, and set the number of executors to the largest one that you can. Then you might be able to run your processing to the end.

Additionally, you may want to look into the spark.shuffle.minNumPartitionsToHighlyCompress option; if you set it to less than your number of shuffle partitions, you might get another memory boost. Note that this option was a hardcoded constant set to 2000 until a recent Spark version, so depending on your environment you just will need to set spark.sql.shuffle.partitions to a number greater than 2000 to make use of it.

Introduction to Pyspark join types, simply combines each row of the first table with each row of the second table. It seems to me to use joinWith, and do a cross join, you have to use two contradicted statements that union to your whole dataset, I guess this is to really make sure you want to do a cross join – Dan Ciborowski - MSFT May 1 '18 at 5:56

Agree with Vladimir, thought of adding more points.

see MapStatus set spark.sql.shuffle.partitions it to 2001 (old approach)(default is 200).

new approach (spark.shuffle.minNumPartitionsToHighlyCompress) as Vladimir mentioned in answer.

Why this change ? : MapStatus has 2000 hardcoded SPARK-24519

it will apply different algorithm to process

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }

HighlyCompressedMapStatus :

A MapStatus implementation that stores the accurate size of huge blocks, which are larger than spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty blocks, plus a bitmap for tracking which blocks are empty.

spark.shuffle.accurateBlockThreshold - see here : When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the size accurately if it's above this config. This helps to prevent OOM by avoiding underestimating shuffle block size when fetch shuffle blocks.


CompressedMapStatus :

A MapStatus implementation that tracks the size of each block. Size for each block is represented using a single byte.

Also set to your spark-submit

--conf spark.yarn.executor.memoryOverhead=<10% of executor memory>  -- conf spark.shuffle.compress=true --conf spark.shuffle.spill.compress=true 

in both cases Compression will use spark.io.compression.codec

Conclusion : large tasks should use HighlyCompressedMapStatus and executor memory overhead can be 10 percent of your executor memory.

Further, have a look at spark memory tuning

What is JOIN?, ensures that data on each partition has the same keys by partitioning the second dataset with the same default partitioner as the first. Cross Joins. Cross Join or cartesian product is one kind of join where each row of one dataset is joined with other. So if we have a dataset of size m and if we join with other dataset with of size n , we will getting a dataset with m*n number of rows. Cross joins are one of the most time consuming joins and often should be avoided. But sometimes, we may accidentally do them without intending to do so. But we recognise performance issue only when they run on large data. So having ability to

Increase the SPARK_EXECUTOR_MEMORY to higher value and repartition to more partitions

Optimize Spark SQL Joins - DataKare Solutions, That being said, when you do a join of two SQL datasets/dataframes, the number of partitions that Spark would use to store the result of the join is controlled by  You can join two datasets using the join operators with an optional join condition. You can also use SQL mode to join datasets using good ol' SQL. val spark: SparkSession = spark.sql ("select * from t1, t2 where t1.id = t2.id") You can specify a join condition (aka join expression) as part of join operators or using where or filter operators.

Cross join between two large datasets in Spark - scala - html, Joins are one of the costliest operations in spark or big data in general. cartesian product is one kind of join where each row of one dataset is  Sticking to use cases mentioned above, Spark will perform (or be forced by us to perform) joins in two different ways: either using Sort Merge Joins if we are joining two big tables, or Broadcast Joins if at least one of the datasets involved is small enough to be stored in the memory of the single all executors. Note that there are other types

Migrating to Spark 2.0 - Part 4 : Cross Joins, You can join two datasets using the join operators with an optional join condition. Untyped Row -based cross join. join. DataFrame. Untyped Row -based join that are supposed to optimize your join queries over large distributed datasets. Spark Left Semi Join. When the left semi join is used, all rows in the left dataset that match in the right dataset are returned in the final result. However, unlike the left outer join, the result does not contain merged data from the two datasets. It contains only the columns brought by the left dataset.

Dataset Join Operators · The Internals of Spark SQL, As SQL is a mature technology with a large user base, your For an efficient cross join of two rather small tables, do repartition the left table. Nobody cares about this case, but this would probably be just a single Spark SQL join: Small vs (Medium, Large): If we are joining a small dataset with a large one, it could be a good idea, instead of sorting and exchanging the data all over the cluster, to broadcast the small dataset to every node, allowing the node to access the data locally:

Comments
  • pls add your code with examples which is producing the error. it will be very difficult to answer the questions with out seeing code and scenarios.
  • Consider if you really need to do a crossJoin. There is no optimum technique. Cartesian products are expensive.
  • Expect .52 trillion records
  • The number of records would be high no doubt. But how should the data partitioning be done ideally? I am quite new to Spark. I placed 40 as a number considering the fact that I have 10 executors and 4 cores for each executor.
  • need your spark-submit and full error stack trace