How do I use a column I created in a Spark Join? - Ambiguous Error

spark join only keep one column
spark dataframe drop duplicate columns
spark join ambiguous columns
spark dataframe join multiple columns java
org apache spark-sql analysisexception found duplicate column(s) in the data schema
spark join two dataframes with same columns
spark rename column
reference is ambiguous, could be spark

I have been fighting with this for a while in scala, and I can not seem to find a clear solution for it.

I have 2 dataframes:

val Companies = Seq(
  (8, "Yahoo"),
  (-5, "Google"),
  (12, "Microsoft"),
  (-10, "Uber")
).toDF("movement", "Company")
val LookUpTable = Seq(
  ("B", "Buy"),
  ("S", "Sell")
).toDF("Code", "Description")

I need to create a column in Companies that allows me to join the lookup table. Its a simple case statement that checks if the movement is negative, then sell, else buy. I then need to join on the lookup table on this newly created column.

val joined = Companies.as("Companies")
    .withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END"))
    .join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Code", "left_outer")

However, I keep getting the following error:

org.apache.spark.sql.AnalysisException: Reference 'Code' is ambiguous, could be: Code, LookUpTable.Code.;
  at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:888)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:890)
  at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:887)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:896)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:896)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:896)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:956)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:956)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105

I have tried adding the alias for Code, but that does not work:

val joined = Companies.as("Companies")
    .withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END"))
    .join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Companies.Code", "left_outer")

org.apache.spark.sql.AnalysisException: cannot resolve '`Companies.Code`' given input columns: [Code, LookUpTable.Code, LookUpTable.Description, Companies.Company, Companies.movement];;
'Join LeftOuter, (Code#102625 = 'Companies.Code)
:- Project [movement#102616, Company#102617, CASE WHEN (movement#102616 > 0) THEN B ELSE S END AS Code#102629]
:  +- SubqueryAlias `Companies`
:     +- Project [_1#102613 AS movement#102616, _2#102614 AS Company#102617]
:        +- LocalRelation [_1#102613, _2#102614]
+- SubqueryAlias `LookUpTable`
   +- Project [_1#102622 AS Code#102625, _2#102623 AS Description#102626]
      +- LocalRelation [_1#102622, _2#102623]

The only work around that I found was to alias the newly created column, however that then creates an additional column which feels incorrect.

val joined = Companies.as("Companies")
    .withColumn("_Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END")).as("Code")
    .join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Code", "left_outer")


joined.show()

+--------+---------+-----+----+-----------+
|movement|  Company|_Code|Code|Description|
+--------+---------+-----+----+-----------+
|       8|    Yahoo|    B|   B|        Buy|
|       8|    Yahoo|    B|   S|       Sell|
|      -5|   Google|    S|   B|        Buy|
|      -5|   Google|    S|   S|       Sell|
|      12|Microsoft|    B|   B|        Buy|
|      12|Microsoft|    B|   S|       Sell|
|     -10|     Uber|    S|   B|        Buy|
|     -10|     Uber|    S|   S|       Sell|
+--------+---------+-----+----+-----------+

Is there a way to join on the newly created column without having to create a new dataframe or new column through an alias?

have you tried using Seq in Spark dataframe.

1.Using Seq Without duplicate column

val joined = Companies.as("Companies")
    .withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END"))
    .join(LookUpTable.as("LookUpTable"), Seq("Code"), "left_outer")
  1. alias after withColumn but it will generate duplicate column
val joined = Companies.withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END")).as("Companies")
.join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Companies.Code", "left_outer")

Is there a better method to join two dataframes and not have a , I would like to keep only one of the columns used to join the dataframes. Using select() after the join Executing display above causes an ambiguous name error: http://docs.databricks.com/spark/latest/faq/join-two-dataframes- duplicated-column.html. We'll keep The result created with 2012 columns. Above result is created by join with a dataframe to itself, you can see there are 4 columns with both two a and f. If you want to know more about Spark, then do

Aliasing would be required if you need the columns from two different dataframes having same name. This is because Spark dataframe API creates a schema for the said dataframe, and in a given schema, you can never have two or more columns with same name.

This is also the reason that, in SQL, the SELECT query without aliasing works but if you were to do a CREATE TABLE AS SELECT, it would throw an error like - duplicate columns.

Prevent duplicated columns when joining two DataFrames , If you perform a join in Spark and don't specify your join correctly you'll end up with duplicate column names. This makes it harder to select� MySQL supports two kinds of aliases which are known as column alias and table alias. MySQL alias for columns. Sometimes, column names are so technical that make the query’s output very difficult to understand. To give a column a descriptive name, you can use a column alias. The following statement illustrates how to use the column alias:

Expression can be used for join:

val codeExpression = expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END")
val joined = Companies.as("Companies")
  .join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === codeExpression, "left_outer")

Joining Spark DataFrames Without Duplicate or Ambiguous Column , Joining Spark DataFrames Without Duplicate or Ambiguous Column Names joining multiple dataframes, how do you prevent ambiguous column name errors ? Create first example dataframe val firstDF = spark. a select using the "uid" column, there is no issue with "ambiguous column name" errors. After digging into the Spark API, I found I can first use alias to create an alias for the original dataframe, then I use withColumnRenamed to manually rename every column on the alias, this will do the join without causing the column name duplication.

Spark Dataframe distinguish columns with duplicated name , Above result is created by join with a dataframe to itself, you can see there are 4 columns with both two a and f. The problem There are a few ways you can solve this problem. You can unambiguously reference child table columns using parent columns: df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2). This, in part, has to do with the fact that we currently rewrite all correlated subqueries into a (LEFT/LEFT SEMI/LEFT ANTI) join. We currently lack supports for the following use cases: The use of predicate subqueries in a projection. The use of non-equality predicates below Aggregates and or Window operators.

Re: Best practice to avoid ambiguous columns in DataFrame.join, (Spark 1.4 only) use def join(right: DataFrame, usingColumn: String): > DataFrame* > > df.join(df1, "_1") > > This has the added benefit of only� In Object Explorer, right-click the table with the column for which you want to change and expand the Columns folder. Right-click the column for which you want to specify a computed column formula and click Delete. Click OK. Add a new column and specify the computed column formula by following the previous procedure to add a new computed column.

pyspark.sql module — PySpark 3.0.0 documentation, DataFrame A distributed collection of data grouped into named columns. A SparkSession can be used create DataFrame , register DataFrame as tables, Other short names like 'CST' are not recommended to use because they can be ambiguous. error or errorifexists (default case): Throw an exception if data already. Looks like in spark 1.5, we don't have df.join functions. There is a top level join functions. How do I remove the join column once (which appears twice in the joined table, and any aggregate on that column fails)?

Comments
  • Thanks Mahesh. Does the Second Option allow me to have multiple Alias' with the same name? i.e. ```` val joined = Companies.withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END")).as("Companies") .withColumn("Description", expr("TRIM(Description)")).as("Companies") .join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Companies.Code", "left_outer") .select($"Companies.Code",$Companies.Description") ```
  • @Nirmie do vote up and accept the answer if it is satisfied with your requirements.
  • @Nirmie you just need to keep last .as("companies") not need every time to us .as("companies") so it would be like this ```` val joined = Companies.withColumn("Code",expr("CASE WHEN movement > 0 THEN 'B' ELSE 'S' END")).withColumn("Description", expr("TRIM(Description)")).as("Companies") .join(LookUpTable.as("LookUpTable"), $"LookUpTable.Code" === $"Companies.Code", "left_outer") .select($"Companies.Code",$Companies.Description")````
  • Thanks pasha701. This may work, but on our project we have over ~2000 expressions to having to create these without contention may be a little difficult.