Skip to content

Commit 72ed4cb

Browse files
committed
Unconditionally enable reduce reduction
Removes the feature flag and the reduce collation implementation that is now unreachable. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent bab018e commit 72ed4cb

File tree

14 files changed

+54
-444
lines changed

14 files changed

+54
-444
lines changed

src/compute-types/src/explain/text.rs

Lines changed: 1 addition & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ use crate::plan::join::delta_join::{DeltaPathPlan, DeltaStagePlan};
3737
use crate::plan::join::linear_join::LinearStagePlan;
3838
use crate::plan::join::{DeltaJoinPlan, JoinClosure, LinearJoinPlan};
3939
use crate::plan::reduce::{
40-
AccumulablePlan, BasicPlan, BucketedPlan, CollationPlan, HierarchicalPlan, MonotonicPlan,
41-
SingleBasicPlan,
40+
AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, MonotonicPlan, SingleBasicPlan,
4241
};
4342
use crate::plan::threshold::ThresholdPlan;
4443
use crate::plan::{AvailableCollections, LirId, Plan, PlanNode};
@@ -358,14 +357,6 @@ impl Plan {
358357
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
359358
ctx.indent.reset();
360359
}
361-
ReducePlan::Collation(plan) => {
362-
writeln!(
363-
f,
364-
"{}→Collated Multi-GroupAggregate{annotations}",
365-
ctx.indent
366-
)?;
367-
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
368-
}
369360
}
370361

371362
ctx.indented(|ctx| {
@@ -765,10 +756,6 @@ impl Plan {
765756
writeln!(f, "{}Reduce::Basic{}", ctx.indent, annotations)?;
766757
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
767758
}
768-
ReducePlan::Collation(plan) => {
769-
writeln!(f, "{}Reduce::Collation{}", ctx.indent, annotations)?;
770-
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
771-
}
772759
}
773760
ctx.indented(|ctx| {
774761
if let Some(key) = input_key {
@@ -1614,77 +1601,6 @@ impl BasicPlan {
16141601
}
16151602
}
16161603

1617-
impl DisplayText<PlanRenderingContext<'_, Plan>> for CollationPlan {
1618-
fn fmt_text(
1619-
&self,
1620-
f: &mut fmt::Formatter<'_>,
1621-
ctx: &mut PlanRenderingContext<'_, Plan>,
1622-
) -> fmt::Result {
1623-
if ctx.config.verbose_syntax {
1624-
self.fmt_verbose_text(f, ctx)
1625-
} else {
1626-
self.fmt_default_text(f, ctx)
1627-
}
1628-
}
1629-
}
1630-
1631-
impl CollationPlan {
1632-
#[allow(clippy::needless_pass_by_ref_mut)]
1633-
fn fmt_default_text(
1634-
&self,
1635-
f: &mut fmt::Formatter<'_>,
1636-
ctx: &mut PlanRenderingContext<'_, Plan>,
1637-
) -> fmt::Result {
1638-
if let Some(plan) = &self.accumulable {
1639-
writeln!(f, "{}Accumulable sub-aggregation", ctx.indent)?;
1640-
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1641-
}
1642-
if let Some(plan) = &self.hierarchical {
1643-
writeln!(f, "{}Hierarchical sub-aggregation", ctx.indent)?;
1644-
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1645-
}
1646-
if let Some(plan) = &self.basic {
1647-
writeln!(f, "{}Non-incremental sub-aggregation", ctx.indent)?;
1648-
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1649-
}
1650-
Ok(())
1651-
}
1652-
1653-
fn fmt_verbose_text(
1654-
&self,
1655-
f: &mut fmt::Formatter<'_>,
1656-
ctx: &mut PlanRenderingContext<'_, Plan>,
1657-
) -> fmt::Result {
1658-
{
1659-
use crate::plan::reduce::ReductionType;
1660-
let aggregate_types = &self
1661-
.aggregate_types
1662-
.iter()
1663-
.map(|reduction_type| match reduction_type {
1664-
ReductionType::Accumulable => "a".to_string(),
1665-
ReductionType::Hierarchical => "h".to_string(),
1666-
ReductionType::Basic => "b".to_string(),
1667-
})
1668-
.collect::<Vec<_>>();
1669-
let aggregate_types = separated(", ", aggregate_types);
1670-
writeln!(f, "{}aggregate_types=[{}]", ctx.indent, aggregate_types)?;
1671-
}
1672-
if let Some(plan) = &self.accumulable {
1673-
writeln!(f, "{}accumulable", ctx.indent)?;
1674-
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1675-
}
1676-
if let Some(plan) = &self.hierarchical {
1677-
writeln!(f, "{}hierarchical", ctx.indent)?;
1678-
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1679-
}
1680-
if let Some(plan) = &self.basic {
1681-
writeln!(f, "{}basic", ctx.indent)?;
1682-
ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1683-
}
1684-
Ok(())
1685-
}
1686-
}
1687-
16881604
/// Helper struct for rendering an arrangement.
16891605
struct Arrangement<'a> {
16901606
key: &'a Vec<MirScalarExpr>,

src/compute-types/src/plan.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -635,9 +635,6 @@ impl<T: timely::progress::Timestamp> Plan<T> {
635635
PlanNode::Reduce { plan, .. } => {
636636
// Upgrade non-monotonic hierarchical plans to monotonic with mandatory consolidation.
637637
match plan {
638-
ReducePlan::Collation(collation) => {
639-
collation.as_monotonic(true);
640-
}
641638
ReducePlan::Hierarchical(hierarchical) => {
642639
hierarchical.as_monotonic(true);
643640
}

src/compute-types/src/plan/reduce.proto

Whitespace-only changes.

src/compute-types/src/plan/reduce.rs

Lines changed: 16 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ use std::collections::BTreeMap;
6565
use mz_expr::{
6666
AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr, permutation_for_arrangement,
6767
};
68-
use mz_ore::{assert_none, soft_assert_or_log};
68+
use mz_ore::soft_assert_or_log;
6969
use serde::{Deserialize, Serialize};
7070

7171
use crate::plan::{AvailableCollections, bucketing_of_expected_group_size};
@@ -127,10 +127,6 @@ pub enum ReducePlan {
127127
Hierarchical(HierarchicalPlan),
128128
/// Plan for computing only basic aggregations.
129129
Basic(BasicPlan),
130-
/// Plan for computing a mix of different kinds of aggregations.
131-
/// We need to do extra work here to reassemble results back in the
132-
/// requested order.
133-
Collation(CollationPlan),
134130
}
135131

136132
/// Plan for computing a set of accumulable aggregations.
@@ -351,63 +347,21 @@ impl ReducePlan {
351347
}
352348

353349
// Convert each grouped list of reductions into a plan.
354-
let plan: Vec<_> = reduction_types
355-
.into_iter()
356-
.map(|(typ, aggregates_list)| {
357-
ReducePlan::create_inner(
358-
typ,
359-
aggregates_list,
360-
monotonic,
361-
expected_group_size,
362-
fused_unnest_list,
363-
)
364-
})
365-
.collect();
366-
367-
// If we only have a single type of aggregation present we can
368-
// render that directly
369-
if plan.len() == 1 {
370-
return plan[0].clone();
371-
}
372-
373-
// Otherwise, we have to stitch reductions together.
374-
375-
// First, lets sanity check that we don't have an impossible number
376-
// of reduction types.
377-
assert!(plan.len() <= 3);
378-
379-
let mut collation: CollationPlan = Default::default();
380-
381-
// Construct a mapping from output_position -> reduction that we can
382-
// use to reconstruct the output in the correct order.
383-
let aggregate_types = aggregates
384-
.iter()
385-
.map(|a| reduction_type(&a.func))
386-
.collect::<Vec<_>>();
387-
388-
collation.aggregate_types = aggregate_types;
389-
390-
for expr in plan.into_iter() {
391-
match expr {
392-
ReducePlan::Accumulable(e) => {
393-
assert_none!(collation.accumulable);
394-
collation.accumulable = Some(e);
395-
}
396-
ReducePlan::Hierarchical(e) => {
397-
assert_none!(collation.hierarchical);
398-
collation.hierarchical = Some(e);
399-
}
400-
ReducePlan::Basic(e) => {
401-
assert_none!(collation.basic);
402-
collation.basic = Some(e);
403-
}
404-
ReducePlan::Distinct | ReducePlan::Collation(_) => {
405-
panic!("Inner reduce plan was unsupported type!")
406-
}
407-
}
408-
}
409-
410-
ReducePlan::Collation(collation)
350+
let Some(plan) = reduction_types.pop_first().map(|(typ, aggregates_list)| {
351+
ReducePlan::create_inner(
352+
typ,
353+
aggregates_list,
354+
monotonic,
355+
expected_group_size,
356+
fused_unnest_list,
357+
)
358+
}) else {
359+
panic!(
360+
"Multiple reduction types ({reduction_types:?}) detected in ReducePlan::create_from. This indicates a bug in reduce reduction."
361+
);
362+
};
363+
364+
plan
411365
}
412366

413367
/// Generate a plan for computing the specified type of aggregations.

src/compute-types/src/plan/render_plan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,6 @@ impl<'a, T> std::fmt::Display for RenderPlanExprHumanizer<'a, T> {
931931
write!(f, ")")
932932
}
933933
ReducePlan::Basic(..) => write!(f, "Non-incremental GroupAggregate`"),
934-
ReducePlan::Collation(..) => write!(f, "Collated Multi-GroupAggregate"),
935934
},
936935
TopK {
937936
input: _,

0 commit comments

Comments
 (0)