Why does Spark think this is a cross / Cartesian join
I want to join data twice as below:
rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['idx', 'val']) rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val']) res1 = rdd1.join(rdd2, on=[rdd1['idx'] == rdd2['key1']]) res2 = res1.join(rdd1, on=[res1['key2'] == rdd1['idx']]) res2.show()
Then I get some error :
pyspark.sql.utils.AnalysisException: u'Cartesian joins could be prohibitively expensive and are disabled by default. To explicitly enable them, please set spark.sql.crossJoin.enabled = true;'
But I think this is not a cross join
res2.explain() == Physical Plan == CartesianProduct :- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner : :- *Sort [idx#0L ASC, idx#0L ASC], false, 0 : : +- Exchange hashpartitioning(idx#0L, idx#0L, 200) : : +- *Filter isnotnull(idx#0L) : : +- Scan ExistingRDD[idx#0L,val#1] : +- *Sort [key1#5L ASC, key2#6L ASC], false, 0 : +- Exchange hashpartitioning(key1#5L, key2#6L, 200) : +- *Filter ((isnotnull(key2#6L) && (key2#6L = key1#5L)) && isnotnull(key1#5L)) : +- Scan ExistingRDD[key1#5L,key2#6L,val#7L] +- Scan ExistingRDD[idx#40L,val#41]
This happens because you
join structures sharing the same lineage and this leads to a trivially equal condition:
== Physical Plan == org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans Join Inner, ((idx#204L = key1#209L) && (key2#210L = idx#204L)) :- Filter isnotnull(idx#204L) : +- LogicalRDD [idx#204L, val#205] +- Filter ((isnotnull(key2#210L) && (key2#210L = key1#209L)) && isnotnull(key1#209L)) +- LogicalRDD [key1#209L, key2#210L, val#211L] and LogicalRDD [idx#235L, val#236] Join condition is missing or trivial. Use the CROSS JOIN syntax to allow cartesian products between these relations.;
In case like this you should use aliases:
from pyspark.sql.functions import col rdd1 = spark.createDataFrame(...).alias('rdd1') rdd2 = spark.createDataFrame(...).alias('rdd2') res1 = rdd1.join(rdd2, col('rdd1.idx') == col('rdd2.key1')).alias('res1') res1.join(rdd1, on=col('res1.key2') == col('rdd1.idx')).explain()
== Physical Plan == *SortMergeJoin [key2#297L], [idx#360L], Inner :- *Sort [key2#297L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key2#297L, 200) : +- *SortMergeJoin [idx#290L], [key1#296L], Inner : :- *Sort [idx#290L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(idx#290L, 200) : : +- *Filter isnotnull(idx#290L) : : +- Scan ExistingRDD[idx#290L,val#291] : +- *Sort [key1#296L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key1#296L, 200) : +- *Filter (isnotnull(key2#297L) && isnotnull(key1#296L)) : +- Scan ExistingRDD[key1#296L,key2#297L,val#298L] +- *Sort [idx#360L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(idx#360L, 200) +- *Filter isnotnull(idx#360L) +- Scan ExistingRDD[idx#360L,val#361]
For details see SPARK-6459.
Joins in Apache Spark, In this section, we will be covering the Cartesian joins and Semi-Joins. Prior Spark, 2.1, customer.join(payment) would trigger a cross join. Semi-Join can feel similar to Inner Join but the difference between them is that Left Semi Join only Stack Overflow Public questions and answers; Teams Private questions and answers for your team; Enterprise Private self-hosted questions and answers for your enterprise; Talent Hire technical talent
I was also successful when persisted the dataframe before the second join.
res1 = rdd1.join(rdd2, col('rdd1.idx') == col('rdd2.key1')).persist() res1.join(rdd1, on=col('res1.key2') == col('rdd1.idx'))
Why does Spark fail with “Detected cartesian product for INNER join , Do one thing just trigger inner join after turning on the flag. spark.conf.set("spark.sql.crossJoin.enabled", "true"). You may also use the cross join Spark 2.1: New Cross Join Syntax. Cartesian products are very slow. More importantly, it could consume a lot of memory and trigger OOM. If the join type is not Inner, Spark SQL could use Broadcast Nested Loop Join even if both sides of tables are not small enough. Thus, it also could cause lots of unwanted network traffic.
Persisting did not work for me.
I overcame it with aliases on DataFrames
from pyspark.sql.functions import col df1.alias("buildings").join(df2.alias("managers"), col("managers.distinguishedName") == col("buildings.manager"))
SQL CROSS JOIN Explained By a Practical Example, is a join operation that produces the Cartesian product of two or more tables. Similarly, in SQL, a Cartesian product of two tables A and B is a result set in which each row in the first table (A) is paired with each row in the second table (B). So whenever we program in spark we try to avoid joins or restrict the joins on limited data.There are various optimisations in spark , right from choosing right type of joins and using broadcast joins to improve the performance. Cross Joins. Cross Join or cartesian product is one kind of join where each row of one dataset is joined with other.
Cartesian join, What is the difference between inner join and cross join? def cartesian[U](other: spark.api.java.JavaRDDLike[U, _]): JavaPairRDD[T, U] Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in this and b is in other.
What is the difference between Cross Join and Inner Join in MySQL , usingColumns) method variants errors complaining that a join is a cross join / cartesian product even when it isn't. Example: Dataset<Row> left The CARTESIAN JOIN or CROSS JOIN returns the Cartesian product of the sets of records from two or more joined tables. Thus, it equates to an inner join where the join-condition always evaluates to either True or where the join-condition is absent from the statement.
[#SPARK-21380] Join with Columns thinks inner join is cross join , allowable join types. With a joinExpr I think inner is appropriate (left|right|full) outer). * If false, cartesian products will require explicit CROSS JOIN syntax. */. Consider the two tables below: StudentCourse. CARTESIAN JOIN: The CARTESIAN JOIN is also known as CROSS JOIN. In a CARTESIAN JOIN there is a join for each row of one table to every row of another table. This usually happens when the matching column or WHERE condition is not specified.