How to resolve the "Reference 'xxx' is ambiguous" problem after Spark join

When develop table processing Spark applications, we may encounter this issue.

1
2
3
4
5
6
7
Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 'os' is ambiguous, could be: default.year_table.os, os, osAndSize.os.;
at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$42.apply(Analyzer.scala:1001)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$42.apply(Analyzer.scala:1003)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
...

The root cause of this issue is our DataFrame have two columns with same column name. It may be caused by our inappropriate join. To handle this issue, we should remove the redundant columns before we join to make sure there are no columns with same column name, or use alias to remove it after join.

We can use easier code to reproduce this problem.

1. Reproduce

Imagine company’s business is to sell the OS. We already get the basic information from several data sources.

First, we have the most important table year_table, which is ingested from MySQL to Hive.

1
2
3
4
5
6
7
8
+---+-------+----+-------+-----+
|id |os |year|os |price|
+---+-------+----+-------+-----+
|1 |ubuntu |2005|ubuntu |50 |
|2 |xiaomi |2010|xiaomi |100 |
|3 |suse |2015|suse |150 |
|4 |samsung|2020|samsung|200 |
+---+-------+----+-------+-----+

From the data we can see the OS ubuntu we have was launched in 2015. And we also have 3 other OS. When we read this data, we should read it from Hive. So we simulate this year_table like this.

1
2
3
4
5
6
7
8
9
10
11
12
13
def getOsAndYear: DataFrame = {
val columns = Seq("id", "os", "year")
val data = Seq(
(1,"ubuntu", 2005),
(2, "xiaomi", 2010),
(3, "suse", 2015),
(4, "samsung", 2020)
)
val osAndYear = spark.createDataFrame(data).toDF(columns: _*)
val schema = "default"
osAndYear.write.mode("overwrite").saveAsTable(s"$schema.year_table")
spark.read.table(s"$schema.year_table")
}

Then we have price_table and size_table, which records the price and the space that the OS occupied. These 2 tables are come from some human static data, which is not stable. Some name of the OS is null, but ID still can be used.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def getOsAndPrice: DataFrame = {
val columns = Seq("id", "os", "price")
val data = Seq(
(1,"ubuntu", 50),
(2, "xiaomi", 100),
(3, null, 150),
(4, "samsung", 200)
)
spark.createDataFrame(data).toDF(columns: _*)
}

def getOsAndSize: DataFrame = {
val columns = Seq("id", "os", "size")
val data = Seq(
(1,"ubuntu", 3444),
(2, null, 4546),
(3, "suse", 5747),
(4, "samsung", 687687)
)
spark.createDataFrame(data).toDF(columns: _*)
}

We should join it by name as our name value is not stable.

1
2
3
4
5
val yearAndPrice = getOsAndYear.join(broadcast(getOsAndPrice), Seq("id"), "left_outer")
val osAndSize = getOsAndSize.alias("osAndSize")
val yearPriceAndSize = yearAndPrice.join(osAndSize,
yearAndPrice("id") <=> col("osAndSize.id"), "left_outer")
yearPriceAndSize.show(1000, false)
1
2
3
4
5
6
7
8
+---+-------+----+-------+-----+---+-------+------+
|id |os |year|os |price|id |os |size |
+---+-------+----+-------+-----+---+-------+------+
|1 |ubuntu |2005|ubuntu |50 |1 |ubuntu |3444 |
|2 |xiaomi |2010|xiaomi |100 |2 |null |4546 |
|3 |suse |2015|null |150 |3 |suse |5747 |
|4 |samsung|2020|samsung|200 |4 |samsung|687687|
+---+-------+----+-------+-----+---+-------+------+

The weird things happened. Our table has 3 identical os columns. If we try to select this os column, the exception would be reproduced.

1
yearPriceAndSize.select("os") 
1
Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 'os' is ambiguous, could be: default.year_table.os, os, osAndSize.os.;

2. Solution

2.1 Drop redundant columns before join

1
2
3
4
5
6
val yearAndPrice = getOsAndYear
.join(broadcast(getOsAndPrice.drop("os")/*drop os*/), Seq("id"), "left_outer")
val osAndSize = getOsAndSize.alias("osAndSize")
val yearPriceAndSize = yearAndPrice.join(osAndSize.drop("os"),// drop os
yearAndPrice("id") <=> col("osAndSize.id"), "left_outer")
yearPriceAndSize.select("os").show(100, false)
1
2
3
4
5
6
7
8
+-------+
|os |
+-------+
|ubuntu |
|xiaomi |
|suse |
|samsung|
+-------+

Of course this solution is so cumbersome due to we should drop the redundant columns again and again. As far as we know, sometimes we only know some important columns in some tables in our live environment rather than all the columns. If everytime we remove them, we would repeat ourselves again and again.

2.2 Use a relative way to keep left redundant columns or right redundant columns

We can use Set to find the columns we want to keep or remove. Usually our join has a main DataFrame, which is a DataFrame that we only want to add some necessary columns after join. For example, the year_table is the main DataFrame in the 1st join due to we want to add the price column to the year_table rather than add the year to others.

After the joining, we will apply a select, in which we only keeps the columns that comes from a main DataFrame and the columns that the main DataFrame don’t have. This is what clearRedundant will do.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def dropRedundantColumnsAfterJoin(): Unit = {
val osAndPrice = broadcast(getOsAndPrice).alias("osAndPrice")
val osAndYear = getOsAndYear.alias("osAndYear")
val osAndSize = getOsAndSize.alias("osAndSize")
val yearAndPrice = osAndYear
.join(osAndPrice, Seq("id"), "left_outer")
.transform(clearRedundant(osAndYear, "osAndYear", osAndPrice, "osAndPrice"))
.alias("yearAndPrice")
val yearPriceAndSize =
osAndSize.join(yearAndPrice, Seq("id"), "right_outer")
.transform(clearRedundant(yearAndPrice, "yearAndPrice", osAndSize, "osAndSize"))
yearPriceAndSize.show(200, false)
}

def clearRedundant(mainDf: DataFrame, mainDfAlias: String,
appendDf: DataFrame, appendDfAlias: String): DataFrame => DataFrame = {
val mainCols = mainDf.columns.toSet
val appendDfCols = appendDf.columns.toSet
// Usually our join is to append some columns to a main DataFrame
// So the columns to be append is other DataFrame's columns - the columns we already have in main DataFrame
val appendCols: Set[String] = appendDfCols -- mainCols
val colsAfterJoin: Set[Column] = {
// here is to remove the column alias to make sure the new joined df have no redundant alias
mainCols.map(c => col(s"$mainDfAlias.$c").as(c)) union
appendCols.map(c => col(s"$appendDfAlias.$c").as(c))
}
finalDf => finalDf.select(colsAfterJoin.toList: _*)
}
1
2
3
4
5
6
7
8
+-------+----+---+-----+------+
|os |year|id |price|size |
+-------+----+---+-----+------+
|ubuntu |2005|1 |50 |3444 |
|xiaomi |2010|2 |100 |4546 |
|suse |2015|3 |150 |5747 |
|samsung|2020|4 |200 |687687|
+-------+----+---+-----+------+

Reference

The code in this article: https://github.com/Solodye/spark-demo/blob/main/src/main/scala/net/i18ndev/spark/alias/ReferenceAmbiguousReproduce.scala

How to mock Hive in Spark app: https://stackoverflow.com/questions/55068459/setup-spark-for-ci-how-to-mock-hive-tables