Skip to content

Commit 4ee2596

Browse files
committed
ring buffer: store logs as proto instead of fb
For logs with many fields, the fact that fb is padded and doesn't use varint encoding leads to substantial overhead. This removes that overhead. As part of this change I went a bit over the top and also implemented a custom encoder for the Log proto which allows for zero copy into the ring buffer. This is better than the previous fb implementation which encoded and then did a memcpy into the buffer. Signed-off-by: Matt Klein <[email protected]>
1 parent a72ff48 commit 4ee2596

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+867
-883
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api

bd-api/src/api_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use bd_client_stats_store::Collector;
2020
use bd_client_stats_store::test::StatsHelper;
2121
use bd_grpc_codec::code::Code;
2222
use bd_grpc_codec::{Decompression, Encoder, OptimizeFor};
23-
use bd_internal_logging::{LogFields, LogLevel, LogType};
23+
use bd_internal_logging::{LogFields, LogLevel};
2424
use bd_key_value::Store;
2525
use bd_metadata::{Metadata, Platform};
2626
use bd_network_quality::{NetworkQuality, NetworkQualityResolver as _};
@@ -37,6 +37,7 @@ use bd_proto::protos::client::api::{
3737
RuntimeUpdate,
3838
StatsUploadRequest,
3939
};
40+
use bd_proto::protos::logging::payload::LogType;
4041
use bd_runtime::runtime::{ConfigLoader, FeatureFlag};
4142
use bd_stats_common::labels;
4243
use bd_test_helpers::make_mut;

bd-buffer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ bd-client-common.path = "../bd-client-common"
1515
bd-client-stats-store.path = "../bd-client-stats-store"
1616
bd-completion.path = "../bd-completion"
1717
bd-error-reporter.path = "../bd-error-reporter"
18+
bd-log-primitives.path = "../bd-log-primitives"
1819
bd-proto.path = "../bd-proto"
1920
bd-runtime.path = "../bd-runtime"
2021
bd-stats-common.path = "../bd-stats-common"

bd-buffer/src/buffer/aggregate_ring_buffer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ use super::{
2424
RingBufferProducer,
2525
RingBufferStats,
2626
VolatileRingBuffer,
27-
to_u32,
2827
};
2928
#[cfg(test)]
3029
use crate::buffer::test::thread_synchronizer::ThreadSynchronizer;
3130
use crate::{AbslCode, Error, Result};
31+
use bd_log_primitives::LossyIntToU32;
3232
use parking_lot::Mutex;
3333
#[cfg(test)]
3434
use std::any::Any;
@@ -76,7 +76,7 @@ impl SharedData {
7676
// the volatile buffer.
7777
match producer
7878
.as_mut()
79-
.reserve(to_u32(read_reservation.len()), true)
79+
.reserve(read_reservation.len().to_u32(), true)
8080
{
8181
Ok(write_reservation) => {
8282
write_reservation.copy_from_slice(read_reservation);
@@ -138,8 +138,8 @@ impl RingBufferImpl {
138138
// For aggregate buffers, the size of the file (after subtracting header space) must be >= the
139139
// size of RAM. This is to avoid situations in which we accept a record into RAM but cannot ever
140140
// write it to disk.
141-
if non_volatile_size < to_u32(std::mem::size_of::<FileHeader>())
142-
|| volatile_size > (non_volatile_size - to_u32(std::mem::size_of::<FileHeader>()))
141+
if non_volatile_size < std::mem::size_of::<FileHeader>().to_u32()
142+
|| volatile_size > (non_volatile_size - std::mem::size_of::<FileHeader>().to_u32())
143143
{
144144
log::error!(
145145
"file size '{}' not big enough for header size '{}' or file size (minus header) not \

bd-buffer/src/buffer/aggregate_ring_buffer_test.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ use crate::buffer::test::{
1818
read_and_verify,
1919
start_read_and_verify,
2020
};
21-
use crate::buffer::{RingBuffer, RingBufferStats, StatsTestHelper, to_u32};
21+
use crate::buffer::{RingBuffer, RingBufferStats, StatsTestHelper};
2222
use bd_client_stats_store::Collector;
23+
use bd_log_primitives::LossyIntToU32;
2324
use futures::poll;
2425
use std::sync::Arc;
2526
use tempfile::TempDir;
@@ -48,7 +49,7 @@ impl Helper {
4849
"test",
4950
volatile_size,
5051
temp_dir.path().join("buffer"),
51-
non_volatile_size + to_u32(std::mem::size_of::<FileHeader>()),
52+
non_volatile_size + std::mem::size_of::<FileHeader>().to_u32(),
5253
PerRecordCrc32Check::Yes,
5354
allow_overwrite,
5455
Arc::new(RingBufferStats::default()),
@@ -80,7 +81,7 @@ impl Helper {
8081
"test",
8182
self.volatile_size,
8283
self.temp_dir.path().join("buffer"),
83-
self.non_volatile_size + to_u32(std::mem::size_of::<FileHeader>()),
84+
self.non_volatile_size + std::mem::size_of::<FileHeader>().to_u32(),
8485
PerRecordCrc32Check::Yes,
8586
self.allow_overwrite,
8687
Arc::new(RingBufferStats::default()),

bd-buffer/src/buffer/common_ring_buffer.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
#[path = "./common_ring_buffer_test.rs"]
1010
mod common_ring_buffer_test;
1111

12+
use super::RingBufferStats;
1213
#[cfg(test)]
1314
use super::test::thread_synchronizer::ThreadSynchronizer;
14-
use super::{RingBufferStats, to_u32};
1515
use crate::{AbslCode, Error, Result};
1616
use bd_client_common::error::InvariantError;
17+
use bd_log_primitives::LossyIntToU32;
1718
use parking_lot::{Condvar, Mutex, MutexGuard};
1819
use std::fmt::Display;
1920
use std::ptr::NonNull;
@@ -229,14 +230,14 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
229230
// part of the record. The extra space is always fixed and is the value stored in
230231
// extra_bytes_per_record_.
231232
fn record_size_offset(&self, start: u32) -> u32 {
232-
debug_assert!(self.extra_bytes_per_record >= to_u32(std::mem::size_of::<u32>()));
233-
start + (self.extra_bytes_per_record - to_u32(std::mem::size_of::<u32>()))
233+
debug_assert!(self.extra_bytes_per_record >= std::mem::size_of::<u32>().to_u32());
234+
start + (self.extra_bytes_per_record - std::mem::size_of::<u32>().to_u32())
234235
}
235236

236237
// Returns any extra space at the beginning of the reservation. See recordSizeOffset() for more
237238
// information on the record layout.
238239
fn extra_data(&mut self, start: u32) -> &mut [u8] {
239-
debug_assert!(self.extra_bytes_per_record >= to_u32(std::mem::size_of::<u32>()));
240+
debug_assert!(self.extra_bytes_per_record >= std::mem::size_of::<u32>().to_u32());
240241
let start = start as usize;
241242
let extra_bytes_per_record = self.extra_bytes_per_record as usize;
242243
&mut self.memory()[start .. start + extra_bytes_per_record - std::mem::size_of::<u32>()]
@@ -299,7 +300,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
299300
// space. This can overflow but as long as we are within the memory space crc checks will catch
300301
// further corruption.
301302
let size_index = self.record_size_offset(next_read_start_to_use);
302-
if size_index + to_u32(std::mem::size_of::<u32>()) > to_u32(self.memory.0.len()) {
303+
if size_index + std::mem::size_of::<u32>().to_u32() > self.memory.0.len().to_u32() {
303304
return Err(Error::AbslStatus(
304305
AbslCode::DataLoss,
305306
"corrupted record size index".to_string(),
@@ -318,7 +319,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
318319
// space crc checks will catch further corruption.
319320
if size == 0
320321
|| Self::overflow_add(&[next_read_start_to_use, self.extra_bytes_per_record, size])
321-
.is_none_or(|next_read_start_index| next_read_start_index > to_u32(self.memory.0.len()))
322+
.is_none_or(|next_read_start_index| next_read_start_index > self.memory.0.len().to_u32())
322323
{
323324
return Err(Error::AbslStatus(
324325
AbslCode::DataLoss,
@@ -343,7 +344,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
343344
.last_write_end_before_wrap
344345
.ok_or(InvariantError::Invariant)?
345346
+ 1,
346-
size: to_u32(self.memory.0.len())
347+
size: self.memory.0.len().to_u32()
347348
- (reservation_data
348349
.last_write_end_before_wrap
349350
.ok_or(InvariantError::Invariant)?
@@ -469,7 +470,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
469470
let mut reservation_data = TempReservationData::default();
470471
let next_write_start = *self.next_write_start();
471472

472-
if next_write_start + write_size <= to_u32(self.memory.0.len()) {
473+
if next_write_start + write_size <= self.memory.0.len().to_u32() {
473474
// It fits in the remainder without wrapping.
474475
reservation_data.range.start = next_write_start;
475476
reservation_data.next_write_start = next_write_start + write_size;
@@ -529,7 +530,7 @@ impl<ExtraLockedData> LockedData<ExtraLockedData> {
529530
};
530531

531532
// If size is 0 or > then the ring buffer size we can't do anything.
532-
if size == 0 || actual_size > to_u32(self.memory.0.len()) {
533+
if size == 0 || actual_size > self.memory.0.len().to_u32() {
533534
log::trace!("({}) invalid reservation size: {}", self.name, size);
534535
// Note that we don't record the bytes lost in this case, since a very large number is likely
535536
// to cause overflow.
@@ -996,7 +997,7 @@ impl<ExtraLockedData> CommonRingBuffer<ExtraLockedData> {
996997
lock_count: AtomicU32::default().into(),
997998
shutdown_lock: None,
998999
stats,
999-
extra_bytes_per_record: extra_bytes_per_record + to_u32(std::mem::size_of::<u32>()),
1000+
extra_bytes_per_record: extra_bytes_per_record + std::mem::size_of::<u32>().to_u32(),
10001001
extra_locked_data,
10011002
allow_overwrite,
10021003
on_total_data_loss_cb: Box::new(on_total_data_loss_cb),

bd-buffer/src/buffer/common_ring_buffer_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ use crate::buffer::{
2222
RingBufferStats,
2323
StatsTestHelper,
2424
VolatileRingBuffer,
25-
to_u32,
2625
};
2726
use crate::{AbslCode, Error};
2827
use assert_matches::assert_matches;
2928
use bd_client_stats_store::Collector;
29+
use bd_log_primitives::LossyIntToU32;
3030
use parameterized::parameterized;
3131
use std::any::Any;
3232
use std::sync::Arc;
@@ -78,7 +78,7 @@ impl Helper {
7878
TestType::NonVolatile => NonVolatileRingBuffer::new(
7979
"test".to_string(),
8080
temp_dir.path().join("buffer"),
81-
size + to_u32(std::mem::size_of::<FileHeader>()),
81+
size + std::mem::size_of::<FileHeader>().to_u32(),
8282
AllowOverwrite::Yes,
8383
BlockWhenReservingIntoConcurrentRead::No,
8484
PerRecordCrc32Check::No,
@@ -89,7 +89,7 @@ impl Helper {
8989
"test",
9090
size,
9191
temp_dir.path().join("buffer"),
92-
size + to_u32(std::mem::size_of::<FileHeader>()),
92+
size + std::mem::size_of::<FileHeader>().to_u32(),
9393
PerRecordCrc32Check::No,
9494
AllowOverwrite::Yes,
9595
Arc::new(RingBufferStats::default()),

bd-buffer/src/buffer/mod.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,13 @@ mod volatile_ring_buffer;
1616
mod test;
1717

1818
use bd_client_stats_store::Counter;
19+
use bd_log_primitives::LossyIntToU32;
1920
#[cfg(test)]
2021
use std::any::Any;
2122
use std::sync::Arc;
2223
#[cfg(test)]
2324
use test::thread_synchronizer::ThreadSynchronizer;
2425

25-
// Wrapper that allows converting a usize to a u32. This avoids verbosity in places we know this is
26-
// safe.
27-
#[allow(clippy::cast_possible_truncation)]
28-
#[must_use]
29-
pub const fn to_u32(value: usize) -> u32 {
30-
value as u32
31-
}
32-
3326
//
3427
// RingBufferStats
3528
//
@@ -101,7 +94,7 @@ pub trait RingBufferProducer: Send {
10194

10295
// Writes a single record to the buffer by copying the provided data into the record reservation.
10396
fn write(&mut self, data: &[u8]) -> Result<()> {
104-
let reserved = self.reserve(to_u32(data.len()), true)?;
97+
let reserved = self.reserve(data.len().to_u32(), true)?;
10598
reserved.copy_from_slice(data);
10699
self.commit()
107100
}

bd-buffer/src/buffer/non_volatile_ring_buffer.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ use super::{
2929
RingBufferCursorConsumer,
3030
RingBufferProducer,
3131
RingBufferStats,
32-
to_u32,
3332
};
3433
use crate::{AbslCode, Error, Result};
3534
use bd_client_common::error::InvariantError;
35+
use bd_log_primitives::LossyIntToU32;
3636
use crc32fast::Hasher;
3737
use fs2::FileExt;
3838
use intrusive_collections::offset_of;
@@ -632,7 +632,9 @@ pub struct FileHeader {
632632
crc32: u32,
633633
}
634634

635-
const FILE_HEADER_CURRENT_VERSION: u32 = 1;
635+
// Version 1: Original version.
636+
// Version 2: Switched to protobuf encoding for records.
637+
const FILE_HEADER_CURRENT_VERSION: u32 = 2;
636638

637639
//
638640
// ConsumerType
@@ -738,7 +740,7 @@ impl RingBufferImpl {
738740
const_assert_eq!(offset_of!(FileHeader, next_read_start), 28);
739741
const_assert_eq!(offset_of!(FileHeader, crc32), 36);
740742

741-
if size < to_u32(std::mem::size_of::<FileHeader>()) {
743+
if size < std::mem::size_of::<FileHeader>().to_u32() {
742744
log::error!(
743745
"({name}) file size '{}' not big enough for header size '{}'",
744746
size,

0 commit comments

Comments
 (0)