Skip to content

Conversation

@Tpt
Copy link
Contributor

@Tpt Tpt commented Oct 23, 2025

Rely on aggregate GroupValues abstraction to build a hash table of the emitted rows that is used to deduplicate

We might make things a bit more efficient by rewriting a hash table wrapper just for deduplication, but this implementation should give a fair baseline

Which issue does this PR close?

Rationale for this change

Implements deduplicating recursive CTE (i.e. UNION inside of WITH RECURSIVE) using a hash table. I reuse the one from aggregates to avoid rebuilding a full wrapper and specialization for types. Each time a batch is returned by the static or the recursive terms of the CTE, the hash table is used to remove already seen rows before emitting the rows and keeping them in memory for the next recursion step.

What changes are included in this PR?

Reusing GroupValues trait implementations inside of RecursiveQueryExec to get deduplication working.

Are these changes tested?

Yes, some sqllogictests have been added, including ones that would lead to infinite recursion is deduplication where disabled.

Are there any user-facing changes?

No

Rely on aggregate GroupValues abstraction to build a hash table of the emitted rows that is used to deduplicate

We might make things a bit more efficient by rewriting a hash table wrapper just for deduplication, but this implementation should give a fair baseline
@github-actions github-actions bot added logical-expr Logical plan and expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Oct 23, 2025
@Tpt Tpt changed the title Deduplicating recursive CTE implementation feat: Deduplicating recursive CTE implementation Oct 23, 2025
query I
WITH RECURSIVE nodes AS (
SELECT 1 as id
UNION ALL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was meant to be UNION.

And the original test wasn't generating duplicate results with UNION ALL.
Maybe we should use a different query, I could think of some thing like this,
but I believe there should be a better one.

WITH RECURSIVE nodes AS (
    SELECT id from (VALUES (1), (2)) nodes(id)
    UNION
    SELECT id + 1 as id
    FROM nodes
    WHERE id < 4
)
SELECT * FROM nodes;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for spotting this! Your query is much better indeed. Added.

Copy link
Contributor

@tobixdev tobixdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my perspective this is a very nice and concise solution to the problem.

Furthermore, from my understanding this should also correctly terminate the recursion as only each unique row is pushed into the WorkTable and at some point (as it can be seen in the closure example) this will reach a fix point.

What I am also thinking about is test coverage. My gut feeling says there should be some test cases in the SQLite test suite that cover distinct recursion. Would this cause the extended test suite to fail? Ideally, this solution passes all these test cases now! 🥳 However, I am a bit unsure how this is setup currently.

Thank you!

CAVEAT: I am by no means a DataFusion (nor recurisve query) expert so take my comments with a grain of salt.

mut batch: RecordBatch,
) -> Poll<Option<Result<RecordBatch>>> {
let baseline_metrics = self.baseline_metrics.clone();
if let Some(deduplicator) = &mut self.distinct_deduplicator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding metrics!

I think we could also move the metrics part to <RecursiveQueryStream as Stream>::poll_next as there is already a TODO for doing so. I believe it would be fine to update the time metrics there even if there is no deduplication going on but there might be different opinions.

Anyways, I think this is an improvement over the status quo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for prompting me on this! I removed the TODOs but have not moved the metric code to avoid duplicating it twice (once for the static stream and once for the recursive stream).

}

/// Return a mask, each element true if the value is greater than all previous ones and greater or equal than the min_value
fn are_increasing_mask(values: &[usize], mut min_value: usize) -> BooleanArray {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understood what this function does, but I had a hard time with min_value. Maybe we can be more explicit here. Just some suggestions:

input parameter: min_value -> highest_group_id

// Always update the min_value to do de-duplication within a record batch.
let mut min_value = highet_group_id;

May the integrating the comment in the doc comment for are_increasing_mask is also more than enough.

I think this assumes that the group ids are assigned in-order within the record batch but I think this is a valid assumption. Maybe someone more familiar with the aggregation infrastructure has more information on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this assumes that the group ids are assigned in-order within the record batch

yes, this is part of the GroupValues trait documentation.

I have rephrased the doc comment. I hope it's clearer now.

I have not renamed min_value to highest_group_id, the function does not depends on any specific semantic outside of creating the mask from its inputs. But happy to do the rename if you feel strongly about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's perfectly fine. Just a suggestion 👍

}

fn deduplicate(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
// We use the hash table to allocate new group ids.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make a version of that comment the doc comment for DistinctDeduplicator::deduplicate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I have moved the comment as a doc comment and rephrased it to be hopefully clearer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate logical-expr Logical plan and expressions physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support deduplicating UNION in recursive CTE

3 participants