Skip to content

Commit 4ecccde

Browse files
authored
feat: Add output_bytes to baseline metrics (#18268)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #16244 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Support `output_bytes` in `BaselineMetrics` (a common metrics set for almost all operators) ``` DataFusion CLI v50.3.0 > explain analyze select * from generate_series(1, 1000000) as t1(v1) order by v1 desc; +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | SortExec: expr=[v1@0 DESC], preserve_partitioning=[false], metrics=[output_rows=1000000, elapsed_compute=96.421534ms, output_bytes=7.6 MB, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, batches_split=0] | | | ProjectionExec: expr=[value@0 as v1], metrics=[output_rows=1000000, elapsed_compute=34.125µs, output_bytes=7.7 MB] | | | LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=1, end=1000000, batch_size=8192], metrics=[output_rows=1000000, elapsed_compute=2.262626ms, output_bytes=7.7 MB] | | | | +-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.080 seconds. ``` Note it might overestimate memory due to a well-known issue. See the PR snippet for details ```rs /// Memory usage of all output batches. /// /// Note: This value may be overestimated. If multiple output `RecordBatch` /// instances share underlying memory buffers, their sizes will be counted /// multiple times. /// Issue: <#16841> output_bytes: Count, ``` I think this metric provides valuable insight, so it's better for it to overestimate than not exist at all. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> 1. Add `output_bytes` to `BaselineMetrics`, and it's set to `summary` analyze level. (see config `datafusion.explain.analyze_level` for details) 2. This metrics will be automatically tracked through `record_poll()` API, which is a common interface most operators uses when a new output batch is generated. ## Are these changes tested? UT <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 3. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 8f396b8 commit 4ecccde

File tree

6 files changed

+82
-17
lines changed

6 files changed

+82
-17
lines changed

datafusion/core/tests/sql/explain_analyze.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,36 +63,59 @@ async fn explain_analyze_baseline_metrics() {
6363
"AggregateExec: mode=Partial, gby=[]",
6464
"metrics=[output_rows=3, elapsed_compute="
6565
);
66+
assert_metrics!(
67+
&formatted,
68+
"AggregateExec: mode=Partial, gby=[]",
69+
"output_bytes="
70+
);
6671
assert_metrics!(
6772
&formatted,
6873
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
6974
"metrics=[output_rows=5, elapsed_compute="
7075
);
76+
assert_metrics!(
77+
&formatted,
78+
"AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]",
79+
"output_bytes="
80+
);
7181
assert_metrics!(
7282
&formatted,
7383
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
7484
"metrics=[output_rows=99, elapsed_compute="
7585
);
86+
assert_metrics!(
87+
&formatted,
88+
"FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
89+
"output_bytes="
90+
);
7691
assert_metrics!(
7792
&formatted,
7893
"ProjectionExec: expr=[]",
7994
"metrics=[output_rows=5, elapsed_compute="
8095
);
96+
assert_metrics!(&formatted, "ProjectionExec: expr=[]", "output_bytes=");
8197
assert_metrics!(
8298
&formatted,
8399
"CoalesceBatchesExec: target_batch_size=4096",
84100
"metrics=[output_rows=5, elapsed_compute"
85101
);
102+
assert_metrics!(
103+
&formatted,
104+
"CoalesceBatchesExec: target_batch_size=4096",
105+
"output_bytes="
106+
);
86107
assert_metrics!(
87108
&formatted,
88109
"UnionExec",
89110
"metrics=[output_rows=3, elapsed_compute="
90111
);
112+
assert_metrics!(&formatted, "UnionExec", "output_bytes=");
91113
assert_metrics!(
92114
&formatted,
93115
"WindowAggExec",
94116
"metrics=[output_rows=1, elapsed_compute="
95117
);
118+
assert_metrics!(&formatted, "WindowAggExec", "output_bytes=");
96119

97120
fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool {
98121
use datafusion::physical_plan;

datafusion/physical-plan/src/metrics/baseline.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::task::Poll;
2121

2222
use arrow::record_batch::RecordBatch;
2323

24+
use crate::spill::get_record_batch_memory_size;
25+
2426
use super::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time, Timestamp};
2527
use datafusion_common::Result;
2628

@@ -53,6 +55,16 @@ pub struct BaselineMetrics {
5355

5456
/// output rows: the total output rows
5557
output_rows: Count,
58+
59+
/// Memory usage of all output batches.
60+
///
61+
/// Note: This value may be overestimated. If multiple output `RecordBatch`
62+
/// instances share underlying memory buffers, their sizes will be counted
63+
/// multiple times.
64+
/// Issue: <https://github.com/apache/datafusion/issues/16841>
65+
output_bytes: Count,
66+
// Remember to update `docs/source/user-guide/metrics.md` when updating comments
67+
// or adding new metrics
5668
}
5769

5870
impl BaselineMetrics {
@@ -71,6 +83,9 @@ impl BaselineMetrics {
7183
output_rows: MetricBuilder::new(metrics)
7284
.with_type(super::MetricType::SUMMARY)
7385
.output_rows(partition),
86+
output_bytes: MetricBuilder::new(metrics)
87+
.with_type(super::MetricType::SUMMARY)
88+
.output_bytes(partition),
7489
}
7590
}
7691

@@ -84,6 +99,7 @@ impl BaselineMetrics {
8499
end_time: Default::default(),
85100
elapsed_compute: self.elapsed_compute.clone(),
86101
output_rows: Default::default(),
102+
output_bytes: Default::default(),
87103
}
88104
}
89105

@@ -211,13 +227,17 @@ impl RecordOutput for usize {
211227
impl RecordOutput for RecordBatch {
212228
fn record_output(self, bm: &BaselineMetrics) -> Self {
213229
bm.record_output(self.num_rows());
230+
let n_bytes = get_record_batch_memory_size(&self);
231+
bm.output_bytes.add(n_bytes);
214232
self
215233
}
216234
}
217235

218236
impl RecordOutput for &RecordBatch {
219237
fn record_output(self, bm: &BaselineMetrics) -> Self {
220238
bm.record_output(self.num_rows());
239+
let n_bytes = get_record_batch_memory_size(self);
240+
bm.output_bytes.add(n_bytes);
221241
self
222242
}
223243
}

datafusion/physical-plan/src/metrics/builder.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ impl<'a> MetricBuilder<'a> {
151151
count
152152
}
153153

154+
/// Consume self and create a new counter for recording total output bytes
155+
pub fn output_bytes(self, partition: usize) -> Count {
156+
let count = Count::new();
157+
self.with_partition(partition)
158+
.build(MetricValue::OutputBytes(count.clone()));
159+
count
160+
}
161+
154162
/// Consume self and create a new gauge for reporting current memory usage
155163
pub fn mem_used(self, partition: usize) -> Gauge {
156164
let gauge = Gauge::new();

datafusion/physical-plan/src/metrics/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ impl MetricsSet {
296296
MetricValue::ElapsedCompute(_) => false,
297297
MetricValue::SpillCount(_) => false,
298298
MetricValue::SpilledBytes(_) => false,
299+
MetricValue::OutputBytes(_) => false,
299300
MetricValue::SpilledRows(_) => false,
300301
MetricValue::CurrentMemoryUsage(_) => false,
301302
MetricValue::Gauge { name, .. } => name == metric_name,

datafusion/physical-plan/src/metrics/value.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,8 @@ pub enum MetricValue {
395395
SpillCount(Count),
396396
/// Total size of spilled bytes produced: "spilled_bytes" metric
397397
SpilledBytes(Count),
398+
/// Total size of output bytes produced: "output_bytes" metric
399+
OutputBytes(Count),
398400
/// Total size of spilled rows produced: "spilled_rows" metric
399401
SpilledRows(Count),
400402
/// Current memory used
@@ -449,6 +451,9 @@ impl PartialEq for MetricValue {
449451
(MetricValue::SpilledBytes(count), MetricValue::SpilledBytes(other)) => {
450452
count == other
451453
}
454+
(MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => {
455+
count == other
456+
}
452457
(MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => {
453458
count == other
454459
}
@@ -505,6 +510,7 @@ impl MetricValue {
505510
Self::OutputRows(_) => "output_rows",
506511
Self::SpillCount(_) => "spill_count",
507512
Self::SpilledBytes(_) => "spilled_bytes",
513+
Self::OutputBytes(_) => "output_bytes",
508514
Self::SpilledRows(_) => "spilled_rows",
509515
Self::CurrentMemoryUsage(_) => "mem_used",
510516
Self::ElapsedCompute(_) => "elapsed_compute",
@@ -523,6 +529,7 @@ impl MetricValue {
523529
Self::OutputRows(count) => count.value(),
524530
Self::SpillCount(count) => count.value(),
525531
Self::SpilledBytes(bytes) => bytes.value(),
532+
Self::OutputBytes(bytes) => bytes.value(),
526533
Self::SpilledRows(count) => count.value(),
527534
Self::CurrentMemoryUsage(used) => used.value(),
528535
Self::ElapsedCompute(time) => time.value(),
@@ -550,6 +557,7 @@ impl MetricValue {
550557
Self::OutputRows(_) => Self::OutputRows(Count::new()),
551558
Self::SpillCount(_) => Self::SpillCount(Count::new()),
552559
Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()),
560+
Self::OutputBytes(_) => Self::OutputBytes(Count::new()),
553561
Self::SpilledRows(_) => Self::SpilledRows(Count::new()),
554562
Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()),
555563
Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()),
@@ -588,6 +596,7 @@ impl MetricValue {
588596
(Self::OutputRows(count), Self::OutputRows(other_count))
589597
| (Self::SpillCount(count), Self::SpillCount(other_count))
590598
| (Self::SpilledBytes(count), Self::SpilledBytes(other_count))
599+
| (Self::OutputBytes(count), Self::OutputBytes(other_count))
591600
| (Self::SpilledRows(count), Self::SpilledRows(other_count))
592601
| (
593602
Self::Count { count, .. },
@@ -638,18 +647,21 @@ impl MetricValue {
638647
/// numbers are "more useful" (and displayed first)
639648
pub fn display_sort_key(&self) -> u8 {
640649
match self {
641-
Self::OutputRows(_) => 0, // show first
642-
Self::ElapsedCompute(_) => 1, // show second
643-
Self::SpillCount(_) => 2,
644-
Self::SpilledBytes(_) => 3,
645-
Self::SpilledRows(_) => 4,
646-
Self::CurrentMemoryUsage(_) => 5,
647-
Self::Count { .. } => 6,
648-
Self::Gauge { .. } => 7,
649-
Self::Time { .. } => 8,
650-
Self::StartTimestamp(_) => 9, // show timestamps last
651-
Self::EndTimestamp(_) => 10,
652-
Self::Custom { .. } => 11,
650+
// `BaselineMetrics` that is common for most operators
651+
Self::OutputRows(_) => 0,
652+
Self::ElapsedCompute(_) => 1,
653+
Self::OutputBytes(_) => 2,
654+
// Other metrics
655+
Self::SpillCount(_) => 3,
656+
Self::SpilledBytes(_) => 4,
657+
Self::SpilledRows(_) => 5,
658+
Self::CurrentMemoryUsage(_) => 6,
659+
Self::Count { .. } => 7,
660+
Self::Gauge { .. } => 8,
661+
Self::Time { .. } => 9,
662+
Self::StartTimestamp(_) => 10, // show timestamps last
663+
Self::EndTimestamp(_) => 11,
664+
Self::Custom { .. } => 12,
653665
}
654666
}
655667

@@ -669,7 +681,7 @@ impl Display for MetricValue {
669681
| Self::Count { count, .. } => {
670682
write!(f, "{count}")
671683
}
672-
Self::SpilledBytes(count) => {
684+
Self::SpilledBytes(count) | Self::OutputBytes(count) => {
673685
let readable_count = human_readable_size(count.value());
674686
write!(f, "{readable_count}")
675687
}

docs/source/user-guide/metrics.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ DataFusion operators expose runtime metrics so you can understand where time is
2727

2828
`BaselineMetrics` are available in most physical operators to capture common measurements.
2929

30-
| Metric | Description |
31-
| --------------- | ------------------------------------------------------ |
32-
| elapsed_compute | CPU time the operator actively spends processing work. |
33-
| output_rows | Total number of rows the operator produces. |
30+
| Metric | Description |
31+
| --------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
32+
| elapsed_compute | CPU time the operator actively spends processing work. |
33+
| output_rows | Total number of rows the operator produces. |
34+
| output_bytes | Memory usage of all output batches. Note: This value may be overestimated. If multiple output `RecordBatch` instances share underlying memory buffers, their sizes will be counted multiple times. |
3435

3536
## Operator-specific Metrics
3637

0 commit comments

Comments
 (0)