From cdc571781c807125cf25b522ad4ccf90267248a8 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Tue, 29 Jul 2025 18:32:30 -0700 Subject: [PATCH 1/5] Add test --- .../test/scala/org/apache/spark/sql/JoinSuite.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 59508d7fc1016..cd908d6a6590d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1771,6 +1771,19 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan cached.unpersist() } } + + test("SPARK-52873") { + val query = + """select /*+ SHUFFLE_HASH(r) */ * from + |testData2 l + |left semi join testData2 r + |on cast(l.a / 100 as long) + 1 = cast(r.a / 100 as int) + 1 + |and r.a >= cast(l.a/1000 as int) + 3""".stripMargin + val df = sql(query) + val expected = Row(1, 1) :: Row(1, 2) :: Row(2, 1) :: Row(2, 2) :: + Row (3, 1) :: Row(3, 2) :: Nil; + checkAnswer(df, expected) + } } class ThreadLeakInSortMergeJoinSuite From b4ea794a7d35f9d7053b6c44ac538057904feec3 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Mon, 18 Aug 2025 08:37:39 -0700 Subject: [PATCH 2/5] Update --- .../spark/sql/execution/joins/ShuffledHashJoinExec.scala | 7 +------ .../src/test/scala/org/apache/spark/sql/JoinSuite.scala | 9 +-------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 974f6f9e50c2e..71046e02bfbe8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -67,12 +67,7 @@ case class ShuffledHashJoinExec( // Exposed for testing @transient lazy val ignoreDuplicatedKey = joinType match { - case LeftExistence(_) => - // For building hash relation, ignore duplicated rows with same join keys if: - // 1. Join condition is empty, or - // 2. Join condition only references streamed attributes and build join keys. - val streamedOutputAndBuildKeys = AttributeSet(streamedOutput ++ buildKeys) - condition.forall(_.references.subsetOf(streamedOutputAndBuildKeys)) + case LeftExistence(_) if condition.isEmpty => true case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index cd908d6a6590d..9602fbbeece58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1575,18 +1575,11 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan // No join condition, ignore duplicated key. (s"SELECT /*+ SHUFFLE_HASH(t2) */ t1.c1 FROM t1 LEFT SEMI JOIN t2 ON t1.c1 = t2.c1", true), - // Have join condition on build join key only, ignore duplicated key. + // SPARK-52873: Have any join condition at all, do not ignore duplicated key. (s""" |SELECT /*+ SHUFFLE_HASH(t2) */ t1.c1 FROM t1 LEFT SEMI JOIN t2 |ON t1.c1 = t2.c1 AND CAST(t1.c2 * 2 AS STRING) != t2.c1 """.stripMargin, - true), - // Have join condition on other build attribute beside join key, do not ignore - // duplicated key. - (s""" - |SELECT /*+ SHUFFLE_HASH(t2) */ t1.c1 FROM t1 LEFT SEMI JOIN t2 - |ON t1.c1 = t2.c1 AND t1.c2 * 100 != t2.c2 - """.stripMargin, false) ) semiJoinQueries.foreach { From e6c2b39b011aa3b242663020f4e4b6b89faad08c Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Mon, 18 Aug 2025 12:06:25 -0700 Subject: [PATCH 3/5] Update test name --- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 9602fbbeece58..004c6cdfaa96c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1765,7 +1765,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } - test("SPARK-52873") { + test("SPARK-52873: Don't ignore duplicate keys in SHJ when there is a bound condition") { val query = """select /*+ SHUFFLE_HASH(r) */ * from |testData2 l From cf7afcd2853293fff15c144998d1f325ebfa271e Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Thu, 21 Aug 2025 12:08:49 -0700 Subject: [PATCH 4/5] Update --- .../joins/ShuffledHashJoinExec.scala | 29 +++++++++- .../org/apache/spark/sql/JoinSuite.scala | 57 ++++++++++++------- 2 files changed, 66 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 71046e02bfbe8..ba899c16e99b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -65,9 +65,36 @@ case class ShuffledHashJoinExec( case _ => super.outputOrdering } + private def validCondForIgnoreDupKey(cond: Expression): Boolean = { + // to ignore duplicate keys on the build side, the join condition must + // have the following properties: + // 1) a subtree that is a semantic match to a build-side key, and/or + // 2) outside any subtree that is a semantic match to a build-side key, + // all attributes should be from the stream-side. + val buildKeysSet = ExpressionSet(buildKeys) + val streamedOutputAttrs = AttributeSet(streamedOutput) + validCond0(cond, buildKeysSet, streamedOutputAttrs) + } + + private def validCond0(cond: Expression, + buildKeysSet: ExpressionSet, + streamedOutputAttrs: AttributeSet): Boolean = { + cond match { + // don't bother traversing any subtree that has a semantic match to a build key + case e: Expression if buildKeysSet.contains(e) => true + // all attributes (outside any subtree that matches a build key) should be + // from the stream side + case a: Attribute if !streamedOutputAttrs.contains(a) => false + case e: Expression => + e.children.forall(validCond0(_, buildKeysSet, streamedOutputAttrs)) + case _ => true + } + } + // Exposed for testing @transient lazy val ignoreDuplicatedKey = joinType match { - case LeftExistence(_) if condition.isEmpty => true + case LeftExistence(_) => + condition.forall(validCondForIgnoreDupKey(_)) case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 004c6cdfaa96c..02d37502504d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1571,23 +1571,55 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan spark.range(10).map(i => (i.toString, i + 1)).toDF("c1", "c2").write.saveAsTable("t1") spark.range(10).map(i => ((i % 5).toString, i % 3)).toDF("c1", "c2").write.saveAsTable("t2") + val semiExpected1 = Seq(Row("0"), Row("1"), Row("2"), Row("3"), Row("4")) + val antiExpected1 = Seq(Row("5"), Row("6"), Row("7"), Row("8"), Row("9")) + val semiExpected2 = Seq(Row("0")) + val antiExpected2 = Seq.tabulate(9) { x => Row((x + 1).toString) } + val semiJoinQueries = Seq( // No join condition, ignore duplicated key. (s"SELECT /*+ SHUFFLE_HASH(t2) */ t1.c1 FROM t1 LEFT SEMI JOIN t2 ON t1.c1 = t2.c1", - true), - // SPARK-52873: Have any join condition at all, do not ignore duplicated key. + true, semiExpected1, antiExpected1), + // Have join condition on build join key only, ignore duplicated key. (s""" |SELECT /*+ SHUFFLE_HASH(t2) */ t1.c1 FROM t1 LEFT SEMI JOIN t2 |ON t1.c1 = t2.c1 AND CAST(t1.c2 * 2 AS STRING) != t2.c1 """.stripMargin, - false) + true, semiExpected1, antiExpected1), + // Have join condition on other build attribute beside join key, do not ignore + // duplicated key. + (s""" + |SELECT /*+ SHUFFLE_HASH(t2) */ t1.c1 FROM t1 LEFT SEMI JOIN t2 + |ON t1.c1 = t2.c1 AND t1.c2 * 100 != t2.c2 + """.stripMargin, + false, semiExpected1, antiExpected1), + // SPARK-52873: Have a join condition that references attributes from the build-side + // join key, but those attributes are contained by a different expression than that + // used as the build-side join key (that is, CAST((t2.c2+10000)/1000 AS INT) is not + // the same as t2.c2). In this case, ignoreDuplicatedKey should be false + ( + s""" + |SELECT /*+ SHUFFLE_HASH(t2) */ t1.c1 FROM t1 LEFT SEMI JOIN t2 + |ON CAST((t1.c2+10000)/1000 AS INT) = CAST((t2.c2+10000)/1000 AS INT) + |AND t2.c2 >= t1.c2 + 1 + |""".stripMargin, + false, semiExpected2, antiExpected2), + // SPARK-52873: Have a join condition that contains the same expression as the + // build-side join key,and does not violate any other rules for the join condition. + // In this case, ignoreDuplicatedKey should be true + ( + s""" + |SELECT /*+ SHUFFLE_HASH(t2) */ t1.c1 FROM t1 LEFT SEMI JOIN t2 + |ON t1.c1 * 10000 = t2.c1 * 1000 AND t2.c1 * 1000 >= t1.c1 + |""".stripMargin, + true, semiExpected2, antiExpected2) ) semiJoinQueries.foreach { - case (query, ignoreDuplicatedKey) => + case (query, ignoreDuplicatedKey, semiExpected, antiExpected) => val semiJoinDF = sql(query) val antiJoinDF = sql(query.replaceAll("SEMI", "ANTI")) - checkAnswer(semiJoinDF, Seq(Row("0"), Row("1"), Row("2"), Row("3"), Row("4"))) - checkAnswer(antiJoinDF, Seq(Row("5"), Row("6"), Row("7"), Row("8"), Row("9"))) + checkAnswer(semiJoinDF, semiExpected) + checkAnswer(antiJoinDF, antiExpected) Seq(semiJoinDF, antiJoinDF).foreach { df => assert(collect(df.queryExecution.executedPlan) { case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == ignoreDuplicatedKey => true @@ -1764,19 +1796,6 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan cached.unpersist() } } - - test("SPARK-52873: Don't ignore duplicate keys in SHJ when there is a bound condition") { - val query = - """select /*+ SHUFFLE_HASH(r) */ * from - |testData2 l - |left semi join testData2 r - |on cast(l.a / 100 as long) + 1 = cast(r.a / 100 as int) + 1 - |and r.a >= cast(l.a/1000 as int) + 3""".stripMargin - val df = sql(query) - val expected = Row(1, 1) :: Row(1, 2) :: Row(2, 1) :: Row(2, 2) :: - Row (3, 1) :: Row(3, 2) :: Nil; - checkAnswer(df, expected) - } } class ThreadLeakInSortMergeJoinSuite From 5933402bd7acf0d31269bf1bec28064331a31cd3 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Thu, 21 Aug 2025 14:33:30 -0700 Subject: [PATCH 5/5] Update --- .../joins/ShuffledHashJoinExec.scala | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index ba899c16e99b6..97ca74aee30c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -73,22 +73,21 @@ case class ShuffledHashJoinExec( // all attributes should be from the stream-side. val buildKeysSet = ExpressionSet(buildKeys) val streamedOutputAttrs = AttributeSet(streamedOutput) - validCond0(cond, buildKeysSet, streamedOutputAttrs) - } - private def validCond0(cond: Expression, - buildKeysSet: ExpressionSet, - streamedOutputAttrs: AttributeSet): Boolean = { - cond match { - // don't bother traversing any subtree that has a semantic match to a build key - case e: Expression if buildKeysSet.contains(e) => true - // all attributes (outside any subtree that matches a build key) should be - // from the stream side - case a: Attribute if !streamedOutputAttrs.contains(a) => false - case e: Expression => - e.children.forall(validCond0(_, buildKeysSet, streamedOutputAttrs)) - case _ => true + def validCond(cond: Expression): Boolean = { + cond match { + // don't bother traversing any subtree that has a semantic match to a build key + case e: Expression if buildKeysSet.contains(e) => true + // all attributes (outside any subtree that matches a build key) should be + // from the stream side + case a: Attribute if !streamedOutputAttrs.contains(a) => false + case e: Expression => + e.children.forall(validCond(_)) + case _ => true + } } + + validCond(cond) } // Exposed for testing