Skip to content

Commit 967f2b6

Browse files
mihailoale-dbcloud-fan
authored andcommitted
[SPARK-53308][SQL] Don't remove aliases in RemoveRedundantAliases that would cause duplicates
### What changes were proposed in this pull request? In case a `Project`, `Aggregate` or `Window` is a child of `Union`, we don't remove an `Alias` in case it is on top of an `Attribute` which exists in the output set of the operator. This is needed because otherwise, we end up having an operator with duplicates in its output. When that happens, `Union` is not resolved and we fail (but we shouldn't). Consider this example: ``` SELECT col1 FROM values(1) WHERE 100 IN (SELECT col1 UNION SELECT col1); ``` In this PR I propose that we fix above query (it should pass, now it fails) by not removing `Alias`es under `Union` that would cause duplicates. ### Why are the changes needed? To fix a query pattern. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52060 from mihailoale-db/unionduplicates. Authored-by: mihailoale-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 549c30a commit 967f2b6

File tree

3 files changed

+92
-10
lines changed

3 files changed

+92
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -640,16 +640,23 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
640640
case u: Union =>
641641
var first = true
642642
plan.mapChildren { child =>
643-
if (first) {
644-
first = false
645-
// `Union` inherits its first child's outputs. We don't remove those aliases from the
646-
// first child's tree that prevent aliased attributes to appear multiple times in the
647-
// `Union`'s output. A parent projection node on the top of an `Union` with non-unique
648-
// output attributes could return incorrect result.
649-
removeRedundantAliases(child, excluded ++ child.outputSet)
643+
if (!conf.unionIsResolvedWhenDuplicatesPerChildResolved || shouldRemoveAliasesUnderUnion(
644+
child
645+
)) {
646+
if (first) {
647+
first = false
648+
// `Union` inherits its first child's outputs. We don't remove those aliases from the
649+
// first child's tree that prevent aliased attributes to appear multiple times in the
650+
// `Union`'s output. A parent projection node on the top of an `Union` with
651+
// non-unique output attributes could return incorrect result.
652+
removeRedundantAliases(child, excluded ++ child.outputSet)
653+
} else {
654+
// We don't need to exclude those attributes that `Union` inherits from its first
655+
// child.
656+
removeRedundantAliases(child, excluded -- u.children.head.outputSet)
657+
}
650658
} else {
651-
// We don't need to exclude those attributes that `Union` inherits from its first child.
652-
removeRedundantAliases(child, excluded -- u.children.head.outputSet)
659+
child
653660
}
654661
}
655662

@@ -694,6 +701,44 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
694701
}
695702
}
696703

704+
/**
705+
* In case a [[Project]], [[Aggregate]] or [[Window]] is a child of [[Union]], we don't remove an
706+
* [[Alias]] in case it is on top of an [[Attribute]] which exists in the output set of the
707+
* operator. This is needed because otherwise, we end up having an operator with duplicates in
708+
* its output. When that happens, [[Union]] is not resolved, and we fail (but we shouldn't).
709+
* In this example:
710+
*
711+
* {{{ SELECT col1 FROM values(1) WHERE 100 IN (SELECT col1 UNION SELECT col1); }}}
712+
*
713+
* Without `shouldRemoveAliasesUnderUnion` check, we would remove the [[Alias]] introduced in
714+
* [[DeduplicateRelations]] rule (in a [[Project]] tagged as
715+
* `PROJECT_FOR_EXPRESSION_ID_DEDUPLICATION`), the result is unresolved [[Union]] which causes the
716+
* failure. With the check, [[Alias]] stays, and we resolve the plan properly.
717+
*/
718+
private def shouldRemoveAliasesUnderUnion(operator: LogicalPlan): Boolean = {
719+
operator match {
720+
case project: Project =>
721+
project.projectList.forall {
722+
case Alias(attribute: Attribute, _) =>
723+
!project.outputSet.contains(attribute)
724+
case _ => true
725+
}
726+
case aggregate: Aggregate =>
727+
aggregate.aggregateExpressions.forall {
728+
case Alias(attribute: Attribute, _) =>
729+
!aggregate.outputSet.contains(attribute)
730+
case _ => true
731+
}
732+
case window: Window =>
733+
window.windowExpressions.forall {
734+
case Alias(attribute: Attribute, _) =>
735+
!window.outputSet.contains(attribute)
736+
case _ => true
737+
}
738+
case other => true
739+
}
740+
}
741+
697742
def apply(plan: LogicalPlan): LogicalPlan = removeRedundantAliases(plan, AttributeSet.empty)
698743
}
699744

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
2424
import org.apache.spark.sql.catalyst.plans.logical._
2525
import org.apache.spark.sql.catalyst.rules._
2626
import org.apache.spark.sql.internal.SQLConf
27-
import org.apache.spark.sql.types.MetadataBuilder
27+
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder}
2828

2929
class RemoveRedundantAliasAndProjectSuite extends PlanTest {
3030

@@ -238,4 +238,35 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest {
238238
comparePlans(optimized, expectedWhenNotEnabled)
239239
}
240240
}
241+
242+
test("SPARK-53308: Don't remove aliases in RemoveRedundantAliases that would cause duplicates") {
243+
val exprId = NamedExpression.newExprId
244+
val attribute = AttributeReference("attr", IntegerType)(exprId = exprId)
245+
val project = Project(
246+
Seq(
247+
Alias(attribute, "attr")(),
248+
attribute
249+
),
250+
LocalRelation(attribute)
251+
)
252+
val projectWithoutAlias =
253+
Project(
254+
Seq(
255+
attribute,
256+
attribute
257+
),
258+
LocalRelation(attribute)
259+
)
260+
val union = Union(Seq(project, project))
261+
262+
withSQLConf(SQLConf.UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED.key -> "true") {
263+
val optimized = Optimize.execute(union)
264+
comparePlans(union, optimized)
265+
}
266+
267+
withSQLConf(SQLConf.UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED.key -> "false") {
268+
val optimized = Optimize.execute(union)
269+
comparePlans(optimized, Union(Seq(project, projectWithoutAlias)))
270+
}
271+
}
241272
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5073,6 +5073,12 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
50735073

50745074
checkAnswer(df, Seq(Row(null, null, 820), Row(null, "east", 420), Row("a", null, 370)))
50755075
}
5076+
5077+
test("SPARK-53308: Don't remove aliases in RemoveRedundantAliases that would cause duplicates") {
5078+
val df = sql("SELECT col1 FROM values(1) WHERE 1 IN (SELECT col1 UNION SELECT col1);")
5079+
5080+
checkAnswer(df, Row(1))
5081+
}
50765082
}
50775083

50785084
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)