Skip to content

Commit cefd6fe

Browse files
committed
ring buffer: store logs as proto instead of fb
For logs with many fields, the fact that fb is padded and doesn't use varint encoding leads to substantial overhead. This removes that overhead. As part of this change I went a bit over the top and also implemented a custom encoder for the Log proto which allows for zero copy into the ring buffer. This is better than the previous fb implementation which encoded and then did a memcpy into the buffer. Signed-off-by: Matt Klein <[email protected]>
1 parent 4608094 commit cefd6fe

File tree

80 files changed

+1578
-1063
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+1578
-1063
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bd-api/src/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ impl Api {
590590
.iter()
591591
.map(|(k, v)| {
592592
(
593-
k.to_string(),
593+
k.clone(),
594594
ProtoData {
595595
data_type: Some(Data_type::StringData(v.clone())),
596596
..Default::default()

bd-api/src/api_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use bd_client_stats_store::Collector;
2020
use bd_client_stats_store::test::StatsHelper;
2121
use bd_grpc_codec::code::Code;
2222
use bd_grpc_codec::{Decompression, Encoder, OptimizeFor};
23-
use bd_internal_logging::{LogFields, LogLevel, LogType};
23+
use bd_internal_logging::{LogFields, LogLevel};
2424
use bd_key_value::Store;
2525
use bd_metadata::{Metadata, Platform};
2626
use bd_network_quality::{NetworkQuality, NetworkQualityResolver as _};
@@ -37,6 +37,7 @@ use bd_proto::protos::client::api::{
3737
RuntimeUpdate,
3838
StatsUploadRequest,
3939
};
40+
use bd_proto::protos::logging::payload::LogType;
4041
use bd_runtime::runtime::{ConfigLoader, FeatureFlag};
4142
use bd_stats_common::labels;
4243
use bd_test_helpers::make_mut;

bd-artifact-upload/src/uploader.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -590,11 +590,7 @@ impl Uploader {
590590

591591
#[cfg(test)]
592592
if let Some(hooks) = &self.test_hooks {
593-
hooks
594-
.entry_received_tx
595-
.send(uuid.to_string())
596-
.await
597-
.unwrap();
593+
hooks.entry_received_tx.send(uuid.clone()).await.unwrap();
598594
}
599595
return;
600596
},
@@ -606,11 +602,7 @@ impl Uploader {
606602

607603
#[cfg(test)]
608604
if let Some(hooks) = &self.test_hooks {
609-
hooks
610-
.entry_received_tx
611-
.send(uuid.to_string())
612-
.await
613-
.unwrap();
605+
hooks.entry_received_tx.send(uuid.clone()).await.unwrap();
614606
}
615607
return;
616608
}
@@ -668,11 +660,7 @@ impl Uploader {
668660

669661
#[cfg(test)]
670662
if let Some(hooks) = &self.test_hooks {
671-
hooks
672-
.entry_received_tx
673-
.send(uuid.to_string())
674-
.await
675-
.unwrap();
663+
hooks.entry_received_tx.send(uuid.clone()).await.unwrap();
676664
}
677665
}
678666

@@ -766,8 +754,8 @@ impl Uploader {
766754
upload_uuid.clone(),
767755
UploadArtifactIntentRequest {
768756
type_id: "client_report".to_string(),
769-
artifact_id: id.to_string(),
770-
intent_uuid: upload_uuid.to_string(),
757+
artifact_id: id.clone(),
758+
intent_uuid: upload_uuid.clone(),
771759
time: timestamp.into_proto(),
772760
// TODO(snowp): Figure out how to send relevant metadata about the artifact here.
773761
metadata: vec![],

bd-bonjson/src/decoder_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1053,7 +1053,7 @@ fn test_value_none_removed_partial_errors() {
10531053

10541054
#[test]
10551055
fn test_all_variants() {
1056-
let all_value_types = vec![
1056+
let all_value_types = [
10571057
Value::Null,
10581058
Value::Bool(true),
10591059
Value::Float(std::f64::consts::PI),

bd-buffer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ bd-client-common.path = "../bd-client-common"
1515
bd-client-stats-store.path = "../bd-client-stats-store"
1616
bd-completion.path = "../bd-completion"
1717
bd-error-reporter.path = "../bd-error-reporter"
18+
bd-log-primitives.path = "../bd-log-primitives"
1819
bd-proto.path = "../bd-proto"
1920
bd-runtime.path = "../bd-runtime"
2021
bd-stats-common.path = "../bd-stats-common"

bd-buffer/src/buffer/aggregate_ring_buffer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ use super::{
2424
RingBufferProducer,
2525
RingBufferStats,
2626
VolatileRingBuffer,
27-
to_u32,
2827
};
2928
#[cfg(test)]
3029
use crate::buffer::test::thread_synchronizer::ThreadSynchronizer;
3130
use crate::{AbslCode, Error, Result};
31+
use bd_log_primitives::LossyIntToU32;
3232
use parking_lot::Mutex;
3333
#[cfg(test)]
3434
use std::any::Any;
@@ -76,7 +76,7 @@ impl SharedData {
7676
// the volatile buffer.
7777
match producer
7878
.as_mut()
79-
.reserve(to_u32(read_reservation.len()), true)
79+
.reserve(read_reservation.len().to_u32(), true)
8080
{
8181
Ok(write_reservation) => {
8282
write_reservation.copy_from_slice(read_reservation);
@@ -138,8 +138,8 @@ impl RingBufferImpl {
138138
// For aggregate buffers, the size of the file (after subtracting header space) must be >= the
139139
// size of RAM. This is to avoid situations in which we accept a record into RAM but cannot ever
140140
// write it to disk.
141-
if non_volatile_size < to_u32(std::mem::size_of::<FileHeader>())
142-
|| volatile_size > (non_volatile_size - to_u32(std::mem::size_of::<FileHeader>()))
141+
if non_volatile_size < std::mem::size_of::<FileHeader>().to_u32()
142+
|| volatile_size > (non_volatile_size - std::mem::size_of::<FileHeader>().to_u32())
143143
{
144144
log::error!(
145145
"file size '{}' not big enough for header size '{}' or file size (minus header) not \

bd-buffer/src/buffer/aggregate_ring_buffer_test.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ use crate::buffer::test::{
1818
read_and_verify,
1919
start_read_and_verify,
2020
};
21-
use crate::buffer::{RingBuffer, RingBufferStats, StatsTestHelper, to_u32};
21+
use crate::buffer::{RingBuffer, RingBufferStats, StatsTestHelper};
2222
use bd_client_stats_store::Collector;
23+
use bd_log_primitives::LossyIntToU32;
2324
use futures::poll;
2425
use std::sync::Arc;
2526
use tempfile::TempDir;
@@ -48,7 +49,7 @@ impl Helper {
4849
"test",
4950
volatile_size,
5051
temp_dir.path().join("buffer"),
51-
non_volatile_size + to_u32(std::mem::size_of::<FileHeader>()),
52+
non_volatile_size + std::mem::size_of::<FileHeader>().to_u32(),
5253
PerRecordCrc32Check::Yes,
5354
allow_overwrite,
5455
Arc::new(RingBufferStats::default()),
@@ -80,7 +81,7 @@ impl Helper {
8081
"test",
8182
self.volatile_size,
8283
self.temp_dir.path().join("buffer"),
83-
self.non_volatile_size + to_u32(std::mem::size_of::<FileHeader>()),
84+
self.non_volatile_size + std::mem::size_of::<FileHeader>().to_u32(),
8485
PerRecordCrc32Check::Yes,
8586
self.allow_overwrite,
8687
Arc::new(RingBufferStats::default()),

bd-buffer/src/buffer/common_ring_buffer.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
#[path = "./common_ring_buffer_test.rs"]
1010
mod common_ring_buffer_test;
1111

12+
use super::RingBufferStats;
1213
#[cfg(test)]
1314
use super::test::thread_synchronizer::ThreadSynchronizer;
14-
use super::{RingBufferStats, to_u32};
1515
use crate::{AbslCode, Error, Result};
1616
use bd_client_common::error::InvariantError;
17+
use bd_log_primitives::LossyIntToU32;
1718
use parking_lot::{Condvar, Mutex, MutexGuard};
1819
use std::fmt::Display;
1920
use std::ptr::NonNull;
@@ -229,14 +230,14 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
229230
// part of the record. The extra space is always fixed and is the value stored in
230231
// extra_bytes_per_record_.
231232
fn record_size_offset(&self, start: u32) -> u32 {
232-
debug_assert!(self.extra_bytes_per_record >= to_u32(std::mem::size_of::<u32>()));
233-
start + (self.extra_bytes_per_record - to_u32(std::mem::size_of::<u32>()))
233+
debug_assert!(self.extra_bytes_per_record >= std::mem::size_of::<u32>().to_u32());
234+
start + (self.extra_bytes_per_record - std::mem::size_of::<u32>().to_u32())
234235
}
235236

236237
// Returns any extra space at the beginning of the reservation. See recordSizeOffset() for more
237238
// information on the record layout.
238239
fn extra_data(&mut self, start: u32) -> &mut [u8] {
239-
debug_assert!(self.extra_bytes_per_record >= to_u32(std::mem::size_of::<u32>()));
240+
debug_assert!(self.extra_bytes_per_record >= std::mem::size_of::<u32>().to_u32());
240241
let start = start as usize;
241242
let extra_bytes_per_record = self.extra_bytes_per_record as usize;
242243
&mut self.memory()[start .. start + extra_bytes_per_record - std::mem::size_of::<u32>()]
@@ -299,7 +300,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
299300
// space. This can overflow but as long as we are within the memory space crc checks will catch
300301
// further corruption.
301302
let size_index = self.record_size_offset(next_read_start_to_use);
302-
if size_index + to_u32(std::mem::size_of::<u32>()) > to_u32(self.memory.0.len()) {
303+
if size_index + std::mem::size_of::<u32>().to_u32() > self.memory.0.len().to_u32() {
303304
return Err(Error::AbslStatus(
304305
AbslCode::DataLoss,
305306
"corrupted record size index".to_string(),
@@ -318,7 +319,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
318319
// space crc checks will catch further corruption.
319320
if size == 0
320321
|| Self::overflow_add(&[next_read_start_to_use, self.extra_bytes_per_record, size])
321-
.is_none_or(|next_read_start_index| next_read_start_index > to_u32(self.memory.0.len()))
322+
.is_none_or(|next_read_start_index| next_read_start_index > self.memory.0.len().to_u32())
322323
{
323324
return Err(Error::AbslStatus(
324325
AbslCode::DataLoss,
@@ -343,7 +344,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
343344
.last_write_end_before_wrap
344345
.ok_or(InvariantError::Invariant)?
345346
+ 1,
346-
size: to_u32(self.memory.0.len())
347+
size: self.memory.0.len().to_u32()
347348
- (reservation_data
348349
.last_write_end_before_wrap
349350
.ok_or(InvariantError::Invariant)?
@@ -469,7 +470,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
469470
let mut reservation_data = TempReservationData::default();
470471
let next_write_start = *self.next_write_start();
471472

472-
if next_write_start + write_size <= to_u32(self.memory.0.len()) {
473+
if next_write_start + write_size <= self.memory.0.len().to_u32() {
473474
// It fits in the remainder without wrapping.
474475
reservation_data.range.start = next_write_start;
475476
reservation_data.next_write_start = next_write_start + write_size;
@@ -529,7 +530,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
529530
};
530531

531532
// If size is 0 or > then the ring buffer size we can't do anything.
532-
if size == 0 || actual_size > to_u32(self.memory.0.len()) {
533+
if size == 0 || actual_size > self.memory.0.len().to_u32() {
533534
log::trace!("({}) invalid reservation size: {}", self.name, size);
534535
// Note that we don't record the bytes lost in this case, since a very large number is likely
535536
// to cause overflow.
@@ -996,7 +997,7 @@ impl<ExtraLockedData> CommonRingBuffer<ExtraLockedData> {
996997
lock_count: AtomicU32::default().into(),
997998
shutdown_lock: None,
998999
stats,
999-
extra_bytes_per_record: extra_bytes_per_record + to_u32(std::mem::size_of::<u32>()),
1000+
extra_bytes_per_record: extra_bytes_per_record + std::mem::size_of::<u32>().to_u32(),
10001001
extra_locked_data,
10011002
allow_overwrite,
10021003
on_total_data_loss_cb: Box::new(on_total_data_loss_cb),

0 commit comments

Comments
 (0)