Skip to content

[SPARK-52873][SQL] Further restrict when SHJ semi/anti join can ignore duplicate keys on the build side #52067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

bersprockets
Copy link
Contributor

@bersprockets bersprockets commented Aug 18, 2025

What changes were proposed in this pull request?

After e861b0d, shuffle hash join for left semi/anti/existence will ignore duplicate keys if the join condition is empty or refers to the same parent attributes as the join keys. This PR proposes that duplicate keys should be ignored only when the join condition has these properties:

  1. a subtree that is a semantic match to a build-side key, and/or
  2. all attributes, outside of any subtree that is a semantic match to a build-side join key, should be from the stream-side.

Why are the changes needed?

e861b0d causes a correctness issue when a column is transformed in the build-side join keys and also transformed, but differently, in a join condition. As an example:

create or replace temp view data(a) as values
("xxxx1111"),
("yyyy2222");

create or replace temp view lookup(k) as values
("xxxx22"),
("xxxx33"),
("xxxx11");

-- this returns one row
select *
from data
left semi join lookup
on substring(a, 1, 4) = substring(k, 1, 4)
and substring(a, 1, 6) >= k;

-- this is the same query as above, but with a shuffle hash join hint, and returns no rows
select /*+ SHUFFLE_HASH(lookup) */ *
from data
left semi join lookup
on substring(a, 1, 4) = substring(k, 1, 4)
and substring(a, 1, 6) >= k;

When the join uses broadcast hash join, the hashrelation of lookup has the following key -> values:

Key xxxx:
  xxxx11
  xxxx33
  xxxx22

The join condition matches on the build side row with the value xxxx11.

When the join uses shuffle hash join, on the other hand, the hash relation of lookup has the following key -> values:

Key xxxx:
  xxxx22

Because the keys must be unique, an arbitrary row is chosen to represent the key, and that row does not match the join condition.

After 1f35577, a similar issue happens with integer keys:

create or replace temp view data(a) as values
(10000),
(30000);

create or replace temp view lookup(k) as values
(1000),
(1001),
(1002),
(1003),
(1004);

-- this query returns one row
select * from data left semi join lookup on a/10000 = cast(k/1000 as int) and k >=  a/10 + 3;

-- this is the same query as above, but with a shuffle hash join hint, and returns no rows
select /*+ SHUFFLE_HASH(lookup) */ * from data left semi join lookup on a/10000 = cast(k/1000 as int) and k >=  a/10 + 3;

Does this PR introduce any user-facing change?

No, except for fixing the correctness issue.

How was this patch tested?

Modified an existing unit test.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Aug 18, 2025
@bersprockets bersprockets changed the title [SPARK-52873][SQL] Don't ignore duplicate keys in SHJ when there is a bound condition [SPARK-52873][SQL] SHJ shouldn't ignore duplicate keys when there is a bound condition Aug 18, 2025
// 2. Join condition only references streamed attributes and build join keys.
val streamedOutputAndBuildKeys = AttributeSet(streamedOutput ++ buildKeys)
condition.forall(_.references.subsetOf(streamedOutputAndBuildKeys))
case LeftExistence(_) if condition.isEmpty => true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this issue!

Instead of disable ignoreDuplicatedKey in this case, Is it possible to relax the requirement to that the references in conditions are all in streamedOutput ++ buildKeysThatAreAttributes ? E.g. diff:

-      val streamedOutputAndBuildKeys = AttributeSet(streamedOutput ++ buildKeys)
+      val attrBuildKeys = buildKeys.filter(_.isInstanceOf[Attribute])
+      val streamedOutputAndBuildKeys = AttributeSet(streamedOutput ++ attrBuildKeys)”

The current master branch allows buildKeys’ references in the condition (AttributeSet extracts references), while this diff limits it to streamedOutput and buildKeys that are Attributes

Copy link
Contributor

@peter-toth peter-toth Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe something like:

      val streamedOutputSet = AttributeSet(streamedOutput)
      val buildKeysSet = ExpressionSet(buildKeys)
      condition.forall { c =>
        var valid = true
        c.transformDownWithPruning(c => valid && !buildKeysSet.contains(c.asInstanceOf[Expression])) {
          case a: Attribute if !streamedOutputSet.contains(a) =>
            valid = false
            a
        }
        valid
      }

to allow full expressions from buildKeys to appear in condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth

It's hard for me to grok that traversal, but I think the gist is:

  • Any subtree of condition that is not a semantic match to a build-side join key must be checked for naughty attributes.
  • Set valid to false when we hit a naughty attribute in such a subtree.
  • We can short circuit the traversal once valid is set to false.

I wish there was another way to express that, but I don't know how else to skip selected subtrees of condition.

Copy link
Contributor

@peter-toth peter-toth Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. But we can write a recursive function if that's easier to grasp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your's is nice and compact, but I'll try an explicit traversal and see how sprawling it gets.

Copy link
Contributor

@peter-toth peter-toth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch!
Fix looks good to me, but maybe we can improve it a bit.

@bersprockets bersprockets changed the title [SPARK-52873][SQL] SHJ shouldn't ignore duplicate keys when there is a bound condition [SPARK-52873][SQL] Further restrict when SHJ semi/anti join can ignore duplicate keys on the build side Aug 21, 2025
validCond0(cond, buildKeysSet, streamedOutputAttrs)
}

private def validCond0(cond: Expression,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be a top level function, but can be nested under validCondForIgnoreDupKey() and you don't need to pass in buildKeysSet and streamedOutputAttrs that way, but this is just a nit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants