Skip to content
102 changes: 60 additions & 42 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,9 @@ async fn explain_analyze_baseline_metrics() {
assert_metrics!(
&formatted,
"AggregateExec: mode=Partial, gby=[]",
"metrics=[output_rows=3, elapsed_compute="
);
assert_metrics!(
&formatted,
"AggregateExec: mode=Partial, gby=[]",
"output_bytes="
"metrics=[output_rows=3, elapsed_compute=",
"output_bytes=",
"output_batches=3"
);

assert_metrics!(
Expand All @@ -75,59 +72,76 @@ async fn explain_analyze_baseline_metrics() {
"reduction_factor=5.1% (5/99)"
);

assert_metrics!(
&formatted,
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
"metrics=[output_rows=5, elapsed_compute="
);
assert_metrics!(
&formatted,
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
"output_bytes="
);
assert_metrics!(
&formatted,
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"metrics=[output_rows=99, elapsed_compute="
);
{
let expected_batch_count_after_repartition =
if cfg!(not(feature = "force_hash_collisions")) {
"output_batches=3"
} else {
"output_batches=1"
};

assert_metrics!(
&formatted,
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
"metrics=[output_rows=5, elapsed_compute=",
"output_bytes=",
expected_batch_count_after_repartition
);

assert_metrics!(
&formatted,
"RepartitionExec: partitioning=Hash([c1@0], 3), input_partitions=3",
"metrics=[output_rows=5, elapsed_compute=",
"output_bytes=",
expected_batch_count_after_repartition
);

assert_metrics!(
&formatted,
"ProjectionExec: expr=[]",
"metrics=[output_rows=5, elapsed_compute=",
"output_bytes=",
expected_batch_count_after_repartition
);

assert_metrics!(
&formatted,
"CoalesceBatchesExec: target_batch_size=4096",
"metrics=[output_rows=5, elapsed_compute",
"output_bytes=",
expected_batch_count_after_repartition
);
}

assert_metrics!(
&formatted,
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"output_bytes="
"metrics=[output_rows=99, elapsed_compute=",
"output_bytes=",
"output_batches=1"
);

assert_metrics!(
&formatted,
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
"selectivity=99% (99/100)"
);
assert_metrics!(
&formatted,
"ProjectionExec: expr=[]",
"metrics=[output_rows=5, elapsed_compute="
);
assert_metrics!(&formatted, "ProjectionExec: expr=[]", "output_bytes=");
assert_metrics!(
&formatted,
"CoalesceBatchesExec: target_batch_size=4096",
"metrics=[output_rows=5, elapsed_compute"
);
assert_metrics!(
&formatted,
"CoalesceBatchesExec: target_batch_size=4096",
"output_bytes="
);

assert_metrics!(
&formatted,
"UnionExec",
"metrics=[output_rows=3, elapsed_compute="
"metrics=[output_rows=3, elapsed_compute=",
"output_bytes=",
"output_batches=3"
);
assert_metrics!(&formatted, "UnionExec", "output_bytes=");

assert_metrics!(
&formatted,
"WindowAggExec",
"metrics=[output_rows=1, elapsed_compute="
"metrics=[output_rows=1, elapsed_compute=",
"output_bytes=",
"output_batches=1"
);
assert_metrics!(&formatted, "WindowAggExec", "output_bytes=");

fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
use datafusion::physical_plan;
Expand Down Expand Up @@ -228,9 +242,13 @@ async fn explain_analyze_level() {

for (level, needle, should_contain) in [
(ExplainAnalyzeLevel::Summary, "spill_count", false),
(ExplainAnalyzeLevel::Summary, "output_batches", false),
(ExplainAnalyzeLevel::Summary, "output_rows", true),
(ExplainAnalyzeLevel::Summary, "output_bytes", true),
(ExplainAnalyzeLevel::Dev, "spill_count", true),
(ExplainAnalyzeLevel::Dev, "output_rows", true),
(ExplainAnalyzeLevel::Dev, "output_bytes", true),
(ExplainAnalyzeLevel::Dev, "output_batches", true),
] {
let plan = collect_plan(sql, level).await;
assert_eq!(
Expand Down
18 changes: 12 additions & 6 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,24 @@ use std::io::Write;
use std::path::PathBuf;
use tempfile::TempDir;

/// A macro to assert that some particular line contains two substrings
/// A macro to assert that some particular line contains the given substrings
///
/// Usage: `assert_metrics!(actual, operator_name, metrics)`
/// Usage: `assert_metrics!(actual, operator_name, metrics_1, metrics_2, ...)`
macro_rules! assert_metrics {
($ACTUAL: expr, $OPERATOR_NAME: expr, $METRICS: expr) => {
($ACTUAL: expr, $OPERATOR_NAME: expr, $($METRICS: expr),+) => {
let found = $ACTUAL
.lines()
.any(|line| line.contains($OPERATOR_NAME) && line.contains($METRICS));
.any(|line| line.contains($OPERATOR_NAME) $( && line.contains($METRICS))+);

let mut metrics = String::new();
$(metrics.push_str(format!(" '{}',", $METRICS).as_str());)+
// remove the last `,` from the string
metrics.pop();

assert!(
found,
"Can not find a line with both '{}' and '{}' in\n\n{}",
$OPERATOR_NAME, $METRICS, $ACTUAL
"Cannot find a line with operator name '{}' and metrics containing values {} in :\n\n{}",
$OPERATOR_NAME, metrics, $ACTUAL
);
};
}
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,6 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
self.left_index += 1;
}

self.join_metrics.output_batches.add(1);
return Ok(StatefulStreamResult::Ready(Some(batch)));
}
}
Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ impl HashJoinStream {
&self.column_indices,
self.join_type,
)?;
self.join_metrics.output_batches.add(1);
timer.done();

self.state = HashJoinStreamState::FetchProbeBatch;
Expand Down Expand Up @@ -597,7 +596,6 @@ impl HashJoinStream {
)?
};

self.join_metrics.output_batches.add(1);
timer.done();

if next_offset.is_none() {
Expand Down Expand Up @@ -653,8 +651,6 @@ impl HashJoinStream {
if let Ok(ref batch) = result {
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(batch.num_rows());

self.join_metrics.output_batches.add(1);
}
timer.done();

Expand Down
4 changes: 0 additions & 4 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1483,10 +1483,6 @@ impl NestedLoopJoinStream {
fn maybe_flush_ready_batch(&mut self) -> Option<Poll<Option<Result<RecordBatch>>>> {
if self.output_buffer.has_completed_batch() {
if let Some(batch) = self.output_buffer.next_completed_batch() {
// HACK: this is not part of `BaselineMetrics` yet, so update it
// manually
self.metrics.join_metrics.output_batches.add(1);

// Update output rows for selectivity metric
let output_rows = batch.num_rows();
self.metrics.selectivity.add_part(output_rows);
Expand Down
8 changes: 0 additions & 8 deletions datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ pub(super) struct SortMergeJoinMetrics {
input_batches: Count,
/// Number of rows consumed by this operator
input_rows: Count,
/// Number of batches produced by this operator
output_batches: Count,
/// Execution metrics
baseline_metrics: BaselineMetrics,
/// Peak memory used for buffered data.
Expand All @@ -49,8 +47,6 @@ impl SortMergeJoinMetrics {
let input_batches =
MetricBuilder::new(metrics).counter("input_batches", partition);
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
let output_batches =
MetricBuilder::new(metrics).counter("output_batches", partition);
let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition);
let spill_metrics = SpillMetrics::new(metrics, partition);

Expand All @@ -60,7 +56,6 @@ impl SortMergeJoinMetrics {
join_time,
input_batches,
input_rows,
output_batches,
baseline_metrics,
peak_mem_used,
spill_metrics,
Expand All @@ -82,9 +77,6 @@ impl SortMergeJoinMetrics {
pub fn input_rows(&self) -> Count {
self.input_rows.clone()
}
pub fn output_batches(&self) -> Count {
self.output_batches.clone()
}

pub fn peak_mem_used(&self) -> Gauge {
self.peak_mem_used.clone()
Expand Down
6 changes: 2 additions & 4 deletions datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::task::{Context, Poll};

use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics;
use crate::joins::utils::{compare_join_arrays, JoinFilter};
use crate::metrics::RecordOutput;
use crate::spill::spill_manager::SpillManager;
use crate::{PhysicalExpr, RecordBatchStream, SendableRecordBatchStream};

Expand Down Expand Up @@ -1462,10 +1463,7 @@ impl SortMergeJoinStream {
fn output_record_batch_and_reset(&mut self) -> Result<RecordBatch> {
let record_batch =
concat_batches(&self.schema, &self.staging_output_record_batches.batches)?;
self.join_metrics.output_batches().add(1);
self.join_metrics
.baseline_metrics()
.record_output(record_batch.num_rows());
(&record_batch).record_output(&self.join_metrics.baseline_metrics());
// If join filter exists, `self.output_size` is not accurate as we don't know the exact
// number of rows in the output record batch. If streamed row joined with buffered rows,
// once join filter is applied, the number of output rows may be more than 1.
Expand Down
6 changes: 0 additions & 6 deletions datafusion/physical-plan/src/joins/stream_join_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,8 +682,6 @@ pub struct StreamJoinMetrics {
pub(crate) right: StreamJoinSideMetrics,
/// Memory used by sides in bytes
pub(crate) stream_memory_usage: metrics::Gauge,
/// Number of batches produced by this operator
pub(crate) output_batches: metrics::Count,
/// Number of rows produced by this operator
pub(crate) baseline_metrics: BaselineMetrics,
}
Expand All @@ -709,13 +707,9 @@ impl StreamJoinMetrics {
let stream_memory_usage =
MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);

let output_batches =
MetricBuilder::new(metrics).counter("output_batches", partition);

Self {
left,
right,
output_batches,
stream_memory_usage,
baseline_metrics: BaselineMetrics::new(metrics, partition),
}
Expand Down
1 change: 0 additions & 1 deletion datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,6 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
}
}
Some((batch, _)) => {
self.metrics.output_batches.add(1);
return self
.metrics
.baseline_metrics
Expand Down
6 changes: 0 additions & 6 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1327,8 +1327,6 @@ pub(crate) struct BuildProbeJoinMetrics {
pub(crate) input_batches: metrics::Count,
/// Number of rows consumed by probe-side this operator
pub(crate) input_rows: metrics::Count,
/// Number of batches produced by this operator
pub(crate) output_batches: metrics::Count,
}

// This Drop implementation updates the elapsed compute part of the metrics.
Expand Down Expand Up @@ -1372,9 +1370,6 @@ impl BuildProbeJoinMetrics {

let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);

let output_batches =
MetricBuilder::new(metrics).counter("output_batches", partition);

Self {
build_time,
build_input_batches,
Expand All @@ -1383,7 +1378,6 @@ impl BuildProbeJoinMetrics {
join_time,
input_batches,
input_rows,
output_batches,
baseline,
}
}
Expand Down
14 changes: 14 additions & 0 deletions datafusion/physical-plan/src/metrics/baseline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub struct BaselineMetrics {
/// multiple times.
/// Issue: <https://github.com/apache/datafusion/issues/16841>
output_bytes: Count,

/// output batches: the total output batch count
output_batches: Count,
// Remember to update `docs/source/user-guide/metrics.md` when updating comments
// or adding new metrics
}
Expand All @@ -86,6 +89,9 @@ impl BaselineMetrics {
output_bytes: MetricBuilder::new(metrics)
.with_type(super::MetricType::SUMMARY)
.output_bytes(partition),
output_batches: MetricBuilder::new(metrics)
.with_type(super::MetricType::DEV)
.output_batches(partition),
}
}

Expand All @@ -100,6 +106,7 @@ impl BaselineMetrics {
elapsed_compute: self.elapsed_compute.clone(),
output_rows: Default::default(),
output_bytes: Default::default(),
output_batches: Default::default(),
}
}

Expand All @@ -113,6 +120,11 @@ impl BaselineMetrics {
&self.output_rows
}

/// return the metric for the total number of output batches produced
pub fn output_batches(&self) -> &Count {
&self.output_batches
}

/// Records the fact that this operator's execution is complete
/// (recording the `end_time` metric).
///
Expand Down Expand Up @@ -229,6 +241,7 @@ impl RecordOutput for RecordBatch {
bm.record_output(self.num_rows());
let n_bytes = get_record_batch_memory_size(&self);
bm.output_bytes.add(n_bytes);
bm.output_batches.add(1);
self
}
}
Expand All @@ -238,6 +251,7 @@ impl RecordOutput for &RecordBatch {
bm.record_output(self.num_rows());
let n_bytes = get_record_batch_memory_size(self);
bm.output_bytes.add(n_bytes);
bm.output_batches.add(1);
self
}
}
Expand Down
Loading