Use global Map to manage the alias of Spark DataFrame

Pain points

When writing Spark application, we can set the alias for DataFrame while we cannot get a DataFrame’s alias. This is because when we use df.alias(), Spark will create a org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias, which is a logical plan node. This means that the alias may not be a field inside the DataFrame, it may be just a logical plan to be executed in the runtime.

If in one function we set an alias for a DataFrame, then we want to use the alias in another function, we need to declare a global fields to let us access the same alias in different functions. This will cause code smell.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def painPoint(): Unit = {
val theAlias = "an_alias"

def function1(df: DataFrame): DataFrame = {
df.alias(theAlias)
}

def function2(df: DataFrame): DataFrame = {
df.join(getOsAndPrice.alias("osAndPrice"), Seq("id"), "left_outer")
.drop(col(s"$theAlias.os")) // Here we should keep 'theAlias' field
// drop(s"$theAlias.os") doesn't works, will see the root cause later
}

function2(function1(getOsAndSize)).show(1000, false)
}

Resolve

If we have a global manager to help us manager the alias, the code may change to this.

1
2
3
4
5
6
7
8
9
10
11
12
def painPointResolve(): Unit = {
def function1(df: DataFrame): DataFrame = {
df.transform(setAlias("an_alias"))
}

def function2(df: DataFrame): DataFrame = {
df.join(getOsAndPrice.alias("osAndPrice"), Seq("id"), "left_outer")
.drop(col(s"${getAlias(df)}.os"))
}

function2(function1(getOsAndSize)).show(1000, false)
}

So we can use 2 Map to implement this manager. To prevent the parallel issue, we use ConcurrentHashMap as a place to store the mapping relationship, and we use synchronized() to make the get or set operator an atomic operator.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package net.i18ndev.spark.alias

import org.apache.spark.sql.DataFrame

import java.util.concurrent.ConcurrentHashMap

object DfAliasManager {

lazy private val toAlias: ConcurrentHashMap[DataFrame, String] = new ConcurrentHashMap[DataFrame, String]()
lazy private val toDataFrame: ConcurrentHashMap[String, DataFrame] = new ConcurrentHashMap[String, DataFrame]()

def setAlias(alias: String): DataFrame => DataFrame = {
df =>
this.synchronized{
val aliasedDf = df.alias(alias)
toAlias.put(aliasedDf, alias)
toDataFrame.put(alias, aliasedDf)
aliasedDf
}
}

def getAlias(df: DataFrame): String = {
this.synchronized{
toAlias.get(df)
}
}

def getDataFrame(alias: String): DataFrame = {
this.synchronized{
toDataFrame.get(alias)
}
}

}

Reference

The code in this article: https://github.com/Solodye/spark-demo/blob/main/src/main/scala/net/i18ndev/spark/alias/DfAliasManagerDemo.scala