Skip to content

Commit 5421825

Browse files
authored
feat: add multi level merge sort that will always fit in memory (#15700)
* feat: add multi level merge sort that will always fit in memory * test: add fuzz test for aggregate * update * add more tests * fix test * update tests * added more aggregate fuzz * align with add fuzz tests * add sort fuzz * fix lints and formatting * moved spill in memory constrained envs to separate test * rename `StreamExec` to `OnceExec` * added comment on the usize in the `in_progress_spill_file` inside ExternalSorter * rename buffer_size to buffer_len * reuse code in spill fuzz * double the amount of memory needed to sort * add diagram for explaining the overview * update based on code review * fix test based on new memory calculation * remove get_size in favor of get_sliced_size * change to result
1 parent b7c014a commit 5421825

File tree

10 files changed

+1458
-54
lines changed

10 files changed

+1458
-54
lines changed

datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,21 @@ use datafusion::prelude::{DataFrame, SessionConfig, SessionContext};
3838
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
3939
use datafusion_common::{HashMap, Result};
4040
use datafusion_common_runtime::JoinSet;
41+
use datafusion_functions_aggregate::sum::sum_udaf;
42+
use datafusion_physical_expr::expressions::{col, lit, Column};
43+
use datafusion_physical_expr::PhysicalSortExpr;
44+
use datafusion_physical_plan::InputOrderMode;
45+
use test_utils::{add_empty_batches, StringBatchGenerator};
46+
4147
use datafusion_execution::memory_pool::FairSpillPool;
4248
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
4349
use datafusion_execution::TaskContext;
44-
use datafusion_functions_aggregate::sum::sum_udaf;
4550
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
46-
use datafusion_physical_expr::expressions::{col, lit, Column};
47-
use datafusion_physical_expr::PhysicalSortExpr;
4851
use datafusion_physical_plan::aggregates::{
4952
AggregateExec, AggregateMode, PhysicalGroupBy,
5053
};
5154
use datafusion_physical_plan::metrics::MetricValue;
52-
use datafusion_physical_plan::InputOrderMode;
5355
use datafusion_physical_plan::{collect, displayable, ExecutionPlan};
54-
use test_utils::{add_empty_batches, StringBatchGenerator};
55-
5656
use rand::rngs::StdRng;
5757
use rand::{random, rng, Rng, SeedableRng};
5858

@@ -632,8 +632,11 @@ fn extract_result_counts(results: Vec<RecordBatch>) -> HashMap<Option<String>, i
632632
output
633633
}
634634

635-
fn assert_spill_count_metric(expect_spill: bool, single_aggregate: Arc<AggregateExec>) {
636-
if let Some(metrics_set) = single_aggregate.metrics() {
635+
pub(crate) fn assert_spill_count_metric(
636+
expect_spill: bool,
637+
plan_that_spills: Arc<dyn ExecutionPlan>,
638+
) -> usize {
639+
if let Some(metrics_set) = plan_that_spills.metrics() {
637640
let mut spill_count = 0;
638641

639642
// Inspect metrics for SpillCount
@@ -649,14 +652,16 @@ fn assert_spill_count_metric(expect_spill: bool, single_aggregate: Arc<Aggregate
649652
} else if !expect_spill && spill_count > 0 {
650653
panic!("Expected no spill but found SpillCount metric with value greater than 0.");
651654
}
655+
656+
spill_count
652657
} else {
653658
panic!("No metrics returned from the operator; cannot verify spilling.");
654659
}
655660
}
656661

657662
// Fix for https://github.com/apache/datafusion/issues/15530
658663
#[tokio::test]
659-
async fn test_single_mode_aggregate_with_spill() -> Result<()> {
664+
async fn test_single_mode_aggregate_single_mode_aggregate_with_spill() -> Result<()> {
660665
let scan_schema = Arc::new(Schema::new(vec![
661666
Field::new("col_0", DataType::Int64, true),
662667
Field::new("col_1", DataType::Utf8, true),

datafusion/core/tests/fuzz_cases/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ mod sort_preserving_repartition_fuzz;
3333
mod window_fuzz;
3434

3535
// Utility modules
36+
mod once_exec;
3637
mod record_batch_generator;
38+
mod spilling_fuzz_in_memory_constrained_env;
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_schema::SchemaRef;
19+
use datafusion_common::DataFusionError;
20+
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
21+
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
22+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
23+
use datafusion_physical_plan::{
24+
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
25+
};
26+
use std::any::Any;
27+
use std::fmt::{Debug, Formatter};
28+
use std::sync::{Arc, Mutex};
29+
30+
/// Execution plan that return the stream on the call to `execute`. further calls to `execute` will
31+
/// return an error
32+
pub struct OnceExec {
33+
/// the results to send back
34+
stream: Mutex<Option<SendableRecordBatchStream>>,
35+
cache: PlanProperties,
36+
}
37+
38+
impl Debug for OnceExec {
39+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40+
write!(f, "OnceExec")
41+
}
42+
}
43+
44+
impl OnceExec {
45+
pub fn new(stream: SendableRecordBatchStream) -> Self {
46+
let cache = Self::compute_properties(stream.schema());
47+
Self {
48+
stream: Mutex::new(Some(stream)),
49+
cache,
50+
}
51+
}
52+
53+
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
54+
fn compute_properties(schema: SchemaRef) -> PlanProperties {
55+
PlanProperties::new(
56+
EquivalenceProperties::new(schema),
57+
Partitioning::UnknownPartitioning(1),
58+
EmissionType::Incremental,
59+
Boundedness::Bounded,
60+
)
61+
}
62+
}
63+
64+
impl DisplayAs for OnceExec {
65+
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
66+
match t {
67+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
68+
write!(f, "OnceExec:")
69+
}
70+
DisplayFormatType::TreeRender => {
71+
write!(f, "")
72+
}
73+
}
74+
}
75+
}
76+
77+
impl ExecutionPlan for OnceExec {
78+
fn name(&self) -> &'static str {
79+
Self::static_name()
80+
}
81+
82+
fn as_any(&self) -> &dyn Any {
83+
self
84+
}
85+
86+
fn properties(&self) -> &PlanProperties {
87+
&self.cache
88+
}
89+
90+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
91+
vec![]
92+
}
93+
94+
fn with_new_children(
95+
self: Arc<Self>,
96+
_: Vec<Arc<dyn ExecutionPlan>>,
97+
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
98+
unimplemented!()
99+
}
100+
101+
/// Returns a stream which yields data
102+
fn execute(
103+
&self,
104+
partition: usize,
105+
_context: Arc<TaskContext>,
106+
) -> datafusion_common::Result<SendableRecordBatchStream> {
107+
assert_eq!(partition, 0);
108+
109+
let stream = self.stream.lock().unwrap().take();
110+
111+
stream.ok_or(DataFusionError::Internal(
112+
"Stream already consumed".to_string(),
113+
))
114+
}
115+
}

0 commit comments

Comments
 (0)