Spark Left Outer SortMergeJoin time complexity analyzation

Problem

We have left table and right table, after some repartition, there are 1 partition got joined across lots of partitions.

Here are the partition data inside:

left table partition L right table partition R
1 4
4 4
8 7
10

Suppose after RDD.zipPartition, L and R should be joined together.

Without Spark codegen opened, how many times Spark will visit the data inside L and R?

  • Will it be L * R? Like 3 * 4 = 12?
  • Will it be L + R? Like 3 + 4 = 7?

Result

Time Complexity

It is L + R + 2 * C

  • C is right side joined count

Spark Optimization

If right side table have lots of column cannot be joined, Spark still need to waste time to iterate it We can try

  • Add more partition to reduce the chances of the hash collapse
  • Use appropriate filter before join, to eliminate the chances that Spark iterate records that cannot be joined
  • Use Bloom Filter when we are not sure how to write the filter

Analyze

1. Download Spark source code

  • Make sure you have linux environment, my environment is Ubuntu 22.04 LTS
  • Make sure running this command first mvn clean -DskipTests -T 20 package, eliminate all the maven compile issue, then refresh maven import
  • Spark version: 2.3, which means you need to change to branch-2.3

2. Change a unit test

2.1 Details

For testing the DataFrame processing, we need to hardcode the data inside a UT.

We pick up the following UT to make this change:

  • Path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
  • Change:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
      test("SPARK-20897: cached self-join should not fail") {
    withSQLConf((SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0"), // force to plan sort merge join
    (SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false" ) // force Spark not to use CodeGen
    ) {
    // 0x added behind because later we output using print(), it will shows HEX value
    val df1 = Seq(0x1 -> 0x1, 0x4 -> 0x4, 0x8 -> 0x8).toDF("i", "j").as("t1").coalesce(1) // coalesce force all the data into one partition
    val df2 = Seq(0x10 -> 0x1010, 0x4 -> 0x44, 0x4 -> 0x45, 0x7 -> 0x77)
    .toDF("i", "j").as("t2").coalesce(1)
    df1.join(df2, $"t1.i" === $"t2.i", "left_outer").explain(false)
    df1.join(df2, $"t1.i" === $"t2.i", "left_outer").show(100, false)
    }
    }

Then, we add some log output to Spark source code

  • Path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
  • Change:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
      protected override def doExecute(): RDD[InternalRow] = {

    /*... Ignore some unchanged code ...*/

    case LeftOuter =>
    val smjScanner = new SortMergeJoinScanner(
    streamedKeyGenerator = createLeftKeyGenerator(),
    bufferedKeyGenerator = createRightKeyGenerator(),
    keyOrdering,
    streamedIter = RowIterator.fromScala(leftIter),
    bufferedIter = RowIterator.fromScala(rightIter),
    inMemoryThreshold,
    spillThreshold
    )
    val rightNullRow = new GenericInternalRow(right.output.length)
    print("LeftOuter init in SparkPlan\n") // Indicate the Spark init the LeftOuterIterator
    new LeftOuterIterator(
    smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows).toScala

    /*... Ignore some unchanged code ...*/

    private[joins] class SortMergeJoinScanner(
    streamedKeyGenerator: Projection,
    bufferedKeyGenerator: Projection,
    keyOrdering: Ordering[InternalRow],
    streamedIter: RowIterator,
    bufferedIter: RowIterator,
    inMemoryThreshold: Int,
    spillThreshold: Int) {
    private[this] var streamedRow: InternalRow = _
    private[this] var streamedRowKey: InternalRow = _
    private[this] var bufferedRow: InternalRow = _
    private[this] var bufferedRowKey: InternalRow = _
    private[this] var matchJoinKey: InternalRow = _
    private[this] val bufferedMatches =
    new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)

    print(s"${this.hashCode()} SortMergeJoinScanner init\n") // Because we need to observe the first time Spark reads the right table, rather than left table, we need to add this log to here
    advancedBufferedToRowWithNullFreeJoinKey()

    /*... Ignore some unchanged code ...*/

    final def findNextOuterJoinRows(): Boolean = { // This is the main driver function to drive executor to iterate across all the left and right table
    print("findNextOuterJoinRows() \n") // we add a log to identify when it enters this function
    if (!advancedStreamed()) {
    matchJoinKey = null

    /*... Ignore some unchanged code ...*/

    private def advancedStreamed(): Boolean = { // This advanced Streamed drives the left table's Iterator to reads the left table
    if (streamedIter.advanceNext()) {
    streamedRow = streamedIter.getRow
    streamedRowKey = streamedKeyGenerator(streamedRow)
    // Add logs to identify the left table reading
    print(s"${this.hashCode()} advancedStreamed() streamedIter.advanceNext()=true streamedRow: ${streamedRow}\n")
    true
    } else {
    streamedRow = null
    streamedRowKey = null
    // Add logs to identify the left table reading
    print(s"${this.hashCode()} advancedStreamed() streamedIter.advanceNext()=false streamedRow: ${streamedRow}\n")
    false
    }
    }

    private def advancedBufferedToRowWithNullFreeJoinKey(): Boolean = { // This function drives the right table's Iterator to read the right table, and it can skip the empty row key
    var foundRow: Boolean = false
    while (!foundRow && bufferedIter.advanceNext()) {
    bufferedRow = bufferedIter.getRow
    // Add logs to identify the right table reading
    print(s"${this.hashCode()} [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: ${bufferedRow}, ")
    bufferedRowKey = bufferedKeyGenerator(bufferedRow)
    foundRow = !bufferedRowKey.anyNull
    }
    if (!foundRow) {
    // Add logs to identify the right table reading
    print(s"${this.hashCode()} advancedBufferedToRowWithNullFreeJoinKey() not found Row, current is ${bufferedRow} ]\n")
    bufferedRow = null
    bufferedRowKey = null
    false
    } else {
    // Add logs to identify the right table reading
    print(s"${this.hashCode()} advancedBufferedToRowWithNullFreeJoinKey() found Row, current is ${bufferedRow} ]\n")
    true
    }
    }

    private def bufferMatchingRows(): Unit = { // This function do the actually "link" between left joined data and right joined data
    assert(streamedRowKey != null)
    assert(!streamedRowKey.anyNull)
    assert(bufferedRowKey != null)
    assert(!bufferedRowKey.anyNull)
    assert(keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0)
    matchJoinKey = streamedRowKey.copy()
    bufferedMatches.clear()
    do {
    // When left record and right record get joined, right record will be stored inside the bufferedRow
    // It will be re-iterated later
    print(s"${this.hashCode()} bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])\n")
    bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
    advancedBufferedToRowWithNullFreeJoinKey()
    } while (bufferedRow != null && keyOrdering.compare(streamedRowKey, bufferedRowKey) == 0)
    }

    private class LeftOuterIterator(
    smjScanner: SortMergeJoinScanner,
    rightNullRow: InternalRow,
    boundCondition: InternalRow => Boolean,
    resultProj: InternalRow => InternalRow,
    numOutputRows: SQLMetric)
    extends OneSideOuterIterator(
    smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows) {
    // We add a log here to verify that, 1. the LeftOuterJoin is activated
    // 2. Record the hashcode
    print(s"${this.hashCode()} LeftOuterIterator init\n")

    protected override def setStreamSideOutput(row: InternalRow): Unit = joinedRow.withLeft(row)
    protected override def setBufferedSideOutput(row: InternalRow): Unit = joinedRow.withRight(row)

    /*... Ignore some unchanged code ...*/

    private def advanceStream(): Boolean = {
    print(s"${this.hashCode()} OneSideOuterIterator.advanceStream()\n") // This advanceStream() again, it is LeftOuterIterator's advanceStream
    rightMatchesIterator = null
    if (smjScanner.findNextOuterJoinRows()) {
    setStreamSideOutput(smjScanner.getStreamedRow)
    if (smjScanner.getBufferedMatches.isEmpty) {
    setBufferedSideOutput(bufferedSideNullRow)
    } else {
    // Find the next row in the buffer that satisfied the bound condition
    if (!advanceBufferUntilBoundConditionSatisfied()) {
    setBufferedSideOutput(bufferedSideNullRow)
    }
    }
    true
    } else {
    // Stream has been exhausted
    false
    }
    }

    // This function will iterate the bufferMatches again, to make the real joined record and output them
    private def advanceBufferUntilBoundConditionSatisfied(): Boolean = {
    // Add log here to prove the advanceBufferUntilBoundConditionSatisfied will be called
    print(s"${this.hashCode()} OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()\n")
    var foundMatch: Boolean = false
    if (rightMatchesIterator == null) {
    rightMatchesIterator = smjScanner.getBufferedMatches.generateIterator()
    // Add log here to see when it get the bufferMatches
    print(s"${this.hashCode()} advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = ${rightMatchesIterator.hashCode()} \n")
    }

    while (!foundMatch && rightMatchesIterator.hasNext) {
    setBufferedSideOutput(rightMatchesIterator.next())
    val nextMatch = rightMatchesIterator.next()
    // Add log here to see the content in bufferMatches, to prove that, bufferMatches stores the joined right table records
    print(s"rightMatchesIterator ${rightMatchesIterator.hashCode()} has ${nextMatch}")
    setBufferedSideOutput(nextMatch)
    foundMatch = boundCondition(joinedRow)
    }
    foundMatch
    }

    // LeftOuterIterator's advanceNext always try output the joined record first, then try to drive its advanceStream, then to drive SortMergeJoinScanner.findNextOuterJoinRows()
    override def advanceNext(): Boolean = {
    print(s"${this.hashCode()} OneSideOuterIterator.advanceNext()\n")
    val r = advanceBufferUntilBoundConditionSatisfied() || advanceStream()
    if (r) numOutputRows += 1
    r
    }
    override def getRow: InternalRow = resultProj(joinedRow)
    }

Please refer to this Commit:

3. Run and analyze the log

left side log trace left side data visit left table partition x right table partition y right side data visit right side log trace
/* 2. Left #1 */ LeftOuterIterator call its .advanceStream(), then drives findNextOuterJoinRows() call left table’s .advanceNext() 1 -> 1 4 -> 44 1. advancedBufferedToRowWithNullFreeJoinKey() when SortMergeJoinScanner init
2. This record was iterated AGAIN inside the buffer
1. /* 1. Right #1 */
2. /* 6. Right #1 */
/* 3. Left #2 */ After the 1st record cannot be joined, someone want to iterate again, so LeftOuterIterator’s advanceNext() was called, then left table’s advanceStream() also been called 4 -> 4 4 -> 45 1. advancedBufferedToRowWithNullFreeJoinKey() when doing the join
2. This record was iterated AGAIN inside the buffer
1. /* 4. Right #2 */
2. /* 7. Right #2 */
/* 8. Left #3 */ advancedStreamed(), then it hasn’t been joined 8 -> 8 7 -> 77 advancedBufferedToRowWithNullFreeJoinKey() when doing the join, then it hasn’t been joined /* 5. Right #3 */
10 -> 1010 advancedBufferedToRowWithNullFreeJoinKey(), then it hasn’t been joined /* 9. Right #4 */
Object HashCode
SortMergeJoinScanner 104408849
LeftOuterIterator 165384710
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/home/yzruntime/.jdks/corretto-1.8.0_402/bin/java -javaagent:/opt/idea-IC-233.14475.28/lib/idea_rt.jar=42213:/opt/idea-IC-233.14475.28/bin -Dfile.encoding=UTF-8 -classpath /home/yzruntime/.local/share/JetBrains/IdeaIC2023.3/Scala/lib/runners.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/charsets.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/cldrdata.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/dnsns.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/jaccess.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/jfxrt.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/localedata.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/nashorn.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/sunec.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/sunjce_provider.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/sunpkcs11.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/ext/zipfs.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/jce.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/jfr.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/jfxswt.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/jsse.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/management-agent.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/resources.jar:/home/yzruntime/.jdks/corretto-1.8.0_402/jre/lib/rt.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/sql/core/target/scala-2.11/test-classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/sql/core/target/scala-2.11/classes:/home/yzruntime/.m2/repository/com/univocity/univocity-parsers/2.5.9/univocity-parsers-2.5.9.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/sketch/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/core/target/scala-2.11/classes:/home/yzruntime/.m2/repository/org/apache/avro/avro-mapred/1.7.7/avro-mapred-1.7.7-hadoop2.jar:/home/yzruntime/.m2/repository/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.7.jar:/home/yzruntime/.m2/repository/org/apache/avro/avro-ipc/1.7.7/avro-ipc-1.7.7-tests.jar:/home/yzruntime/.m2/repository/com/twitter/chill_2.11/0.8.4/chill_2.11-0.8.4.jar:/home/yzruntime/.m2/repository/com/twitter/chill-java/0.8.4/chill-java-0.8.4.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-client/2.6.5/hadoop-client-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-common/2.6.5/hadoop-common-2.6.5.jar:/home/yzruntime/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/home/yzruntime/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/home/yzruntime/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar:/home/yzruntime/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/home/yzruntime/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/home/yzruntime/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:/home/yzruntime/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar:/home/yzruntime/.m2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar:/home/yzruntime/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar:/home/yzruntime/.m2/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar:/home/yzruntime/.m2/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar:/home/yzruntime/.m2/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar:/home/yzruntime/.m2/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar:/home/yzruntime/.m2/repository/org/apache/curator/curator-client/2.6.0/curator-client-2.6.0.jar:/home/yzruntime/.m2/repository/org/htrace/htrace-core/3.0.4/htrace-core-3.0.4.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.6.5/hadoop-hdfs-2.6.5.jar:/home/yzruntime/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/home/yzruntime/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar:/home/yzruntime/.m2/repository/xml-apis/xml-apis/1.4.01/xml-apis-1.4.01.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-app/2.6.5/hadoop-mapreduce-client-app-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/2.6.5/hadoop-mapreduce-client-common-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.6.5/hadoop-yarn-client-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-yarn-server-common/2.6.5/hadoop-yarn-server-common-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.6.5/hadoop-mapreduce-client-shuffle-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-yarn-api/2.6.5/hadoop-yarn-api-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/2.6.5/hadoop-mapreduce-client-core-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-yarn-common/2.6.5/hadoop-yarn-common-2.6.5.jar:/home/yzruntime/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/home/yzruntime/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/home/yzruntime/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.9.13/jackson-jaxrs-1.9.13.jar:/home/yzruntime/.m2/repository/org/codehaus/jackson/jackson-xc/1.9.13/jackson-xc-1.9.13.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-jobclient/2.6.5/hadoop-mapreduce-client-jobclient-2.6.5.jar:/home/yzruntime/.m2/repository/org/apache/hadoop/hadoop-annotations/2.6.5/hadoop-annotations-2.6.5.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/launcher/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/kvstore/target/scala-2.11/classes:/home/yzruntime/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/network-common/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/network-shuffle/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/unsafe/target/scala-2.11/classes:/home/yzruntime/.m2/repository/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar:/home/yzruntime/.m2/repository/org/apache/httpcomponents/httpcore/4.4.8/httpcore-4.4.8.jar:/home/yzruntime/.m2/repository/org/apache/httpcomponents/httpclient/4.5.4/httpclient-4.5.4.jar:/home/yzruntime/.m2/repository/javax/activation/activation/1.1.1/activation-1.1.1.jar:/home/yzruntime/.m2/repository/org/bouncycastle/bcprov-jdk15on/1.58/bcprov-jdk15on-1.58.jar:/home/yzruntime/.m2/repository/com/jamesmurty/utils/java-xmlbuilder/1.1/java-xmlbuilder-1.1.jar:/home/yzruntime/.m2/repository/net/iharder/base64/2.3.8/base64-2.3.8.jar:/home/yzruntime/.m2/repository/org/apache/curator/curator-recipes/2.6.0/curator-recipes-2.6.0.jar:/home/yzruntime/.m2/repository/org/apache/curator/curator-framework/2.6.0/curator-framework-2.6.0.jar:/home/yzruntime/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-plus/9.3.24.v20180605/jetty-plus-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-webapp/9.3.24.v20180605/jetty-webapp-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-xml/9.3.24.v20180605/jetty-xml-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-jndi/9.3.24.v20180605/jetty-jndi-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-security/9.3.24.v20180605/jetty-security-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-util/9.3.24.v20180605/jetty-util-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-server/9.3.24.v20180605/jetty-server-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-io/9.3.24.v20180605/jetty-io-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-http/9.3.24.v20180605/jetty-http-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-continuation/9.3.24.v20180605/jetty-continuation-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-proxy/9.3.24.v20180605/jetty-proxy-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-client/9.3.24.v20180605/jetty-client-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-servlets/9.3.24.v20180605/jetty-servlets-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/javax/servlet/javax.servlet-api/3.1.0/javax.servlet-api-3.1.0.jar:/home/yzruntime/.m2/repository/org/apache/commons/commons-lang3/3.5/commons-lang3-3.5.jar:/home/yzruntime/.m2/repository/org/apache/commons/commons-math3/3.4.1/commons-math3-3.4.1.jar:/home/yzruntime/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/yzruntime/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/home/yzruntime/.m2/repository/org/slf4j/jul-to-slf4j/1.7.16/jul-to-slf4j-1.7.16.jar:/home/yzruntime/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.16/jcl-over-slf4j-1.7.16.jar:/home/yzruntime/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/home/yzruntime/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar:/home/yzruntime/.m2/repository/com/ning/compress-lzf/1.0.3/compress-lzf-1.0.3.jar:/home/yzruntime/.m2/repository/org/xerial/snappy/snappy-java/1.1.2.6/snappy-java-1.1.2.6.jar:/home/yzruntime/.m2/repository/org/lz4/lz4-java/1.4.0/lz4-java-1.4.0.jar:/home/yzruntime/.m2/repository/com/github/luben/zstd-jni/1.3.2-2/zstd-jni-1.3.2-2.jar:/home/yzruntime/.m2/repository/org/roaringbitmap/RoaringBitmap/0.7.45/RoaringBitmap-0.7.45.jar:/home/yzruntime/.m2/repository/org/roaringbitmap/shims/0.7.45/shims-0.7.45.jar:/home/yzruntime/.m2/repository/commons-net/commons-net/2.2/commons-net-2.2.jar:/home/yzruntime/.m2/repository/org/scala-lang/scala-library/2.11.8/scala-library-2.11.8.jar:/home/yzruntime/.m2/repository/org/json4s/json4s-jackson_2.11/3.2.11/json4s-jackson_2.11-3.2.11.jar:/home/yzruntime/.m2/repository/org/json4s/json4s-core_2.11/3.2.11/json4s-core_2.11-3.2.11.jar:/home/yzruntime/.m2/repository/org/json4s/json4s-ast_2.11/3.2.11/json4s-ast_2.11-3.2.11.jar:/home/yzruntime/.m2/repository/org/scala-lang/scalap/2.11.8/scalap-2.11.8.jar:/home/yzruntime/.m2/repository/org/scala-lang/scala-compiler/2.11.8/scala-compiler-2.11.8.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/core/jersey-client/2.22.2/jersey-client-2.22.2.jar:/home/yzruntime/.m2/repository/javax/ws/rs/javax.ws.rs-api/2.0.1/javax.ws.rs-api-2.0.1.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/hk2-api/2.4.0-b34/hk2-api-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/hk2-utils/2.4.0-b34/hk2-utils-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/external/aopalliance-repackaged/2.4.0-b34/aopalliance-repackaged-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/external/javax.inject/2.4.0-b34/javax.inject-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/hk2-locator/2.4.0-b34/hk2-locator-2.4.0-b34.jar:/home/yzruntime/.m2/repository/org/javassist/javassist/3.18.1-GA/javassist-3.18.1-GA.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/core/jersey-common/2.22.2/jersey-common-2.22.2.jar:/home/yzruntime/.m2/repository/javax/annotation/javax.annotation-api/1.2/javax.annotation-api-1.2.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/bundles/repackaged/jersey-guava/2.22.2/jersey-guava-2.22.2.jar:/home/yzruntime/.m2/repository/org/glassfish/hk2/osgi-resource-locator/1.0.1/osgi-resource-locator-1.0.1.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/core/jersey-server/2.22.2/jersey-server-2.22.2.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/media/jersey-media-jaxb/2.22.2/jersey-media-jaxb-2.22.2.jar:/home/yzruntime/.m2/repository/javax/validation/validation-api/1.1.0.Final/validation-api-1.1.0.Final.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/containers/jersey-container-servlet/2.22.2/jersey-container-servlet-2.22.2.jar:/home/yzruntime/.m2/repository/org/glassfish/jersey/containers/jersey-container-servlet-core/2.22.2/jersey-container-servlet-core-2.22.2.jar:/home/yzruntime/.m2/repository/io/netty/netty-all/4.1.17.Final/netty-all-4.1.17.Final.jar:/home/yzruntime/.m2/repository/io/netty/netty/3.9.9.Final/netty-3.9.9.Final.jar:/home/yzruntime/.m2/repository/com/clearspring/analytics/stream/2.7.0/stream-2.7.0.jar:/home/yzruntime/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.5/metrics-core-3.1.5.jar:/home/yzruntime/.m2/repository/io/dropwizard/metrics/metrics-jvm/3.1.5/metrics-jvm-3.1.5.jar:/home/yzruntime/.m2/repository/io/dropwizard/metrics/metrics-json/3.1.5/metrics-json-3.1.5.jar:/home/yzruntime/.m2/repository/io/dropwizard/metrics/metrics-graphite/3.1.5/metrics-graphite-3.1.5.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/module/jackson-module-scala_2.11/2.6.7.1/jackson-module-scala_2.11-2.6.7.1.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/module/jackson-module-paranamer/2.7.9/jackson-module-paranamer-2.7.9.jar:/home/yzruntime/.m2/repository/org/apache/ivy/ivy/2.4.0/ivy-2.4.0.jar:/home/yzruntime/.m2/repository/oro/oro/2.0.8/oro-2.0.8.jar:/home/yzruntime/.m2/repository/net/razorvine/pyrolite/4.13/pyrolite-4.13.jar:/home/yzruntime/.m2/repository/net/sf/py4j/py4j/0.10.7/py4j-0.10.7.jar:/home/yzruntime/.m2/repository/org/apache/commons/commons-crypto/1.0.0/commons-crypto-1.0.0.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/core/target/scala-2.11/test-classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/sql/catalyst/target/scala-2.11/classes:/home/yzruntime/.m2/repository/org/scala-lang/scala-reflect/2.11.8/scala-reflect-2.11.8.jar:/home/yzruntime/.m2/repository/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.4/scala-parser-combinators_2.11-1.0.4.jar:/home/yzruntime/.m2/repository/org/codehaus/janino/janino/3.0.8/janino-3.0.8.jar:/home/yzruntime/.m2/repository/org/codehaus/janino/commons-compiler/3.0.8/commons-compiler-3.0.8.jar:/home/yzruntime/.m2/repository/org/antlr/antlr4-runtime/4.7/antlr4-runtime-4.7.jar:/home/yzruntime/.m2/repository/commons-codec/commons-codec/1.10/commons-codec-1.10.jar:/home/yzruntime/GitProjSrc/_forked_spark/spark/sql/catalyst/target/scala-2.11/test-classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/tags/target/scala-2.11/classes:/home/yzruntime/GitProjSrc/_forked_spark/spark/common/tags/target/scala-2.11/test-classes:/home/yzruntime/.m2/repository/org/apache/orc/orc-core/1.4.4/orc-core-1.4.4-nohive.jar:/home/yzruntime/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/home/yzruntime/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/home/yzruntime/.m2/repository/io/airlift/aircompressor/0.8/aircompressor-0.8.jar:/home/yzruntime/.m2/repository/org/apache/orc/orc-mapreduce/1.4.4/orc-mapreduce-1.4.4-nohive.jar:/home/yzruntime/.m2/repository/com/esotericsoftware/kryo-shaded/3.0.3/kryo-shaded-3.0.3.jar:/home/yzruntime/.m2/repository/com/esotericsoftware/minlog/1.3.0/minlog-1.3.0.jar:/home/yzruntime/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-column/1.8.3/parquet-column-1.8.3.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-common/1.8.3/parquet-common-1.8.3.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-encoding/1.8.3/parquet-encoding-1.8.3.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-hadoop/1.8.3/parquet-hadoop-1.8.3.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-format/2.3.1/parquet-format-2.3.1.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-jackson/1.8.3/parquet-jackson-1.8.3.jar:/home/yzruntime/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar:/home/yzruntime/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/home/yzruntime/.m2/repository/org/eclipse/jetty/jetty-servlet/9.3.24.v20180605/jetty-servlet-9.3.24.v20180605.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.6.7.1/jackson-databind-2.6.7.1.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.6.7/jackson-annotations-2.6.7.jar:/home/yzruntime/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.6.7/jackson-core-2.6.7.jar:/home/yzruntime/.m2/repository/org/apache/arrow/arrow-vector/0.8.0/arrow-vector-0.8.0.jar:/home/yzruntime/.m2/repository/org/apache/arrow/arrow-format/0.8.0/arrow-format-0.8.0.jar:/home/yzruntime/.m2/repository/org/apache/arrow/arrow-memory/0.8.0/arrow-memory-0.8.0.jar:/home/yzruntime/.m2/repository/joda-time/joda-time/2.9.3/joda-time-2.9.3.jar:/home/yzruntime/.m2/repository/com/carrotsearch/hppc/0.7.2/hppc-0.7.2.jar:/home/yzruntime/.m2/repository/com/vlkan/flatbuffers/1.2.0-3f79e055/flatbuffers-1.2.0-3f79e055.jar:/home/yzruntime/.m2/repository/org/apache/xbean/xbean-asm5-shaded/4.4/xbean-asm5-shaded-4.4.jar:/home/yzruntime/.m2/repository/org/scalacheck/scalacheck_2.11/1.13.5/scalacheck_2.11-1.13.5.jar:/home/yzruntime/.m2/repository/org/scala-sbt/test-interface/1.0/test-interface-1.0.jar:/home/yzruntime/.m2/repository/com/h2database/h2/1.4.195/h2-1.4.195.jar:/home/yzruntime/.m2/repository/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar:/home/yzruntime/.m2/repository/org/postgresql/postgresql/9.4.1207.jre7/postgresql-9.4.1207.jre7.jar:/home/yzruntime/.m2/repository/org/apache/parquet/parquet-avro/1.8.3/parquet-avro-1.8.3.jar:/home/yzruntime/.m2/repository/it/unimi/dsi/fastutil/6.5.7/fastutil-6.5.7.jar:/home/yzruntime/.m2/repository/org/apache/avro/avro/1.8.1/avro-1.8.1.jar:/home/yzruntime/.m2/repository/com/thoughtworks/paranamer/paranamer/2.8/paranamer-2.8.jar:/home/yzruntime/.m2/repository/org/apache/commons/commons-compress/1.8.1/commons-compress-1.8.1.jar:/home/yzruntime/.m2/repository/org/tukaani/xz/1.5/xz-1.5.jar:/home/yzruntime/.m2/repository/org/mockito/mockito-core/1.10.19/mockito-core-1.10.19.jar:/home/yzruntime/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar:/home/yzruntime/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/home/yzruntime/.m2/repository/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar:/home/yzruntime/.m2/repository/org/scalatest/scalatest_2.11/3.0.3/scalatest_2.11-3.0.3.jar:/home/yzruntime/.m2/repository/org/scalactic/scalactic_2.11/3.0.3/scalactic_2.11-3.0.3.jar:/home/yzruntime/.m2/repository/org/scala-lang/modules/scala-xml_2.11/1.0.5/scala-xml_2.11-1.0.5.jar:/home/yzruntime/.m2/repository/junit/junit/4.12/junit-4.12.jar:/home/yzruntime/.m2/repository/com/novocode/junit-interface/0.11/junit-interface-0.11.jar org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner -s org.apache.spark.sql.DataFrameSuite -testName "SPARK-20897: cached self-join should not fail" -showProgressMessages true
Testing started at 4:35 pm ...


01:35:55.103 WARN org.apache.spark.util.Utils: Your hostname, yzhost resolves to a loopback address: 127.0.1.1; using 192.168.50.125 instead (on interface wlp5s0)
01:35:55.104 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
01:35:55.262 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

== Physical Plan ==
SortMergeJoin [i#5], [i#16], LeftOuter
:- Sort [i#5 ASC NULLS FIRST], false, 0
: +- Coalesce 1
: +- LocalTableScan [i#5, j#6]
+- Sort [i#16 ASC NULLS FIRST], false, 0
+- Coalesce 1
+- LocalTableScan [i#16, j#17]
104408849 SortMergeJoinScanner init
/* 1. Right #1 */ 104408849 [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: [0,4,44], 104408849 advancedBufferedToRowWithNullFreeJoinKey() found Row, current is [0,4,44] ]
LeftOuter init in SparkPlan
165384710 LeftOuterIterator init
165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = 1252938438
165384710 OneSideOuterIterator.advanceStream()
findNextOuterJoinRows()
/* 2. Left #1 */ 104408849 advancedStreamed() streamedIter.advanceNext()=true streamedRow: [0,1,1]
165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = 2135241262
165384710 OneSideOuterIterator.advanceStream()
findNextOuterJoinRows()
/* 3. Left #2 */ 104408849 advancedStreamed() streamedIter.advanceNext()=true streamedRow: [0,4,4]
/* 3.1 Right #1 got joined, then it was put inside the buffer */ 104408849 bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
/* 4. Right #2 */ 104408849 [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: [0,4,45], 104408849 advancedBufferedToRowWithNullFreeJoinKey() found Row, current is [0,4,45] ]
* 4.1 Right #2 got joined, then it was put inside the buffer */ 104408849 bufferedMatches.add(bufferedRow.asInstanceOf[UnsafeRow])
/* 5. Right #3 */ 104408849 [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: [0,7,77], 104408849 advancedBufferedToRowWithNullFreeJoinKey() found Row, current is [0,7,77] ]
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = 1621560499
/* 6. Right #1 */ rightMatchesIterator 1621560499 has [0,4,44]165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
/* 7. Right #2 */ rightMatchesIterator 1621560499 has [0,4,45]165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 OneSideOuterIterator.advanceStream()
findNextOuterJoinRows()
/* 8. Left #3 */ 104408849 advancedStreamed() streamedIter.advanceNext()=true streamedRow: [0,8,8]
/* 9. Right #4 */ 104408849 [advancedBufferedToRowWithNullFreeJoinKey() bufferedRow in while: [0,10,1010], 104408849 advancedBufferedToRowWithNullFreeJoinKey() found Row, current is [0,10,1010] ]
165384710 OneSideOuterIterator.advanceNext()
165384710 OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied()
165384710 advanceBufferUntilBoundConditionSatisfied() arightMatchesIterator = smjScanner.getBufferedMatches.generateIterator() = 2010570488
165384710 OneSideOuterIterator.advanceStream()
findNextOuterJoinRows()
104408849 advancedStreamed() streamedIter.advanceNext()=false streamedRow: null
+---+---+----+----+
|i |j |i |j |
+---+---+----+----+
|1 |1 |null|null|
|4 |4 |4 |68 |
|4 |4 |4 |69 |
|8 |8 |null|null|
+---+---+----+----+




Process finished with exit code 0

Reference