How to resolve the "Reference 'xxx' is ambiguous" problem after Spark join
Posted onEdited on
When develop table processing Spark applications, we may encounter this issue.
1 2 3 4 5 6 7
Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 'os' is ambiguous, could be: default.year_table.os, os, osAndSize.os.; at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$42.apply(Analyzer.scala:1001) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$42.apply(Analyzer.scala:1003) at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53) ...
The root cause of this issue is our DataFrame have two columns with same column name.
It may be caused by our inappropriate join. To handle this issue, we should remove the redundant columns before we join
to make sure there are no columns with same column name, or use alias to remove it after join.
We can use easier code to reproduce this problem.
1. Reproduce
Imagine company’s business is to sell the OS. We already get the basic information from several data sources.
First, we have the most important table year_table, which is ingested from MySQL to Hive.
From the data we can see the OS ubuntu we have was launched in 2015. And we also have 3 other OS.
When we read this data, we should read it from Hive. So we simulate this year_table like this.
1 2 3 4 5 6 7 8 9 10 11 12 13
defgetOsAndYear: DataFrame = { val columns = Seq("id", "os", "year") val data = Seq( (1,"ubuntu", 2005), (2, "xiaomi", 2010), (3, "suse", 2015), (4, "samsung", 2020) ) val osAndYear = spark.createDataFrame(data).toDF(columns: _*) val schema = "default" osAndYear.write.mode("overwrite").saveAsTable(s"$schema.year_table") spark.read.table(s"$schema.year_table") }
Then we have price_table and size_table, which records the price and the space that the OS occupied.
These 2 tables are come from some human static data, which is not stable.
Some name of the OS is null, but ID still can be used.
The weird things happened. Our table has 3 identical os columns.
If we try to select this os column, the exception would be reproduced.
1
yearPriceAndSize.select("os")
1
Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 'os' is ambiguous, could be: default.year_table.os, os, osAndSize.os.;
2. Solution
2.1 Drop redundant columns before join
1 2 3 4 5 6
val yearAndPrice = getOsAndYear .join(broadcast(getOsAndPrice.drop("os")/*drop os*/), Seq("id"), "left_outer") val osAndSize = getOsAndSize.alias("osAndSize") val yearPriceAndSize = yearAndPrice.join(osAndSize.drop("os"),// drop os yearAndPrice("id") <=> col("osAndSize.id"), "left_outer") yearPriceAndSize.select("os").show(100, false)
Of course this solution is so cumbersome due to we should drop the redundant columns again and again.
As far as we know, sometimes we only know some important columns in some tables in our live environment
rather than all the columns. If everytime we remove them, we would repeat ourselves again and again.
2.2 Use a relative way to keep left redundant columns or right redundant columns
We can use Set to find the columns we want to keep or remove. Usually our join has a main DataFrame, which is a DataFrame
that we only want to add some necessary columns after join. For example, the year_table is the main DataFrame in the
1st join due to we want to add the price column to the year_table rather than add the year to others.
After the joining, we will apply a select, in which we only keeps the columns that comes from a main DataFrame and the
columns that the main DataFrame don’t have. This is what clearRedundant will do.
defdropRedundantColumnsAfterJoin(): Unit = { val osAndPrice = broadcast(getOsAndPrice).alias("osAndPrice") val osAndYear = getOsAndYear.alias("osAndYear") val osAndSize = getOsAndSize.alias("osAndSize") val yearAndPrice = osAndYear .join(osAndPrice, Seq("id"), "left_outer") .transform(clearRedundant(osAndYear, "osAndYear", osAndPrice, "osAndPrice")) .alias("yearAndPrice") val yearPriceAndSize = osAndSize.join(yearAndPrice, Seq("id"), "right_outer") .transform(clearRedundant(yearAndPrice, "yearAndPrice", osAndSize, "osAndSize")) yearPriceAndSize.show(200, false) }
defclearRedundant(mainDf: DataFrame, mainDfAlias: String, appendDf: DataFrame, appendDfAlias: String): DataFrame => DataFrame = { val mainCols = mainDf.columns.toSet val appendDfCols = appendDf.columns.toSet // Usually our join is to append some columns to a main DataFrame // So the columns to be append is other DataFrame's columns - the columns we already have in main DataFrame val appendCols: Set[String] = appendDfCols -- mainCols val colsAfterJoin: Set[Column] = { // here is to remove the column alias to make sure the new joined df have no redundant alias mainCols.map(c => col(s"$mainDfAlias.$c").as(c)) union appendCols.map(c => col(s"$appendDfAlias.$c").as(c)) } finalDf => finalDf.select(colsAfterJoin.toList: _*) }