From e403542ced82e6a4f921d36f304f4d132374d512 Mon Sep 17 00:00:00 2001 From: Anil Gupta <90670783+anil-db@users.noreply.github.com> Date: Tue, 19 Aug 2025 18:28:09 -0700 Subject: [PATCH 1/5] fix panic in disk buffer when dealing with corrupted file --- ...disk_buffer_panic_if_corrupted_file.fix.md | 4 + .../src/variants/disk_v2/ledger.rs | 1 - .../src/variants/disk_v2/reader.rs | 5 +- .../variants/disk_v2/tests/known_errors.rs | 147 ++++++++++++++++++ 4 files changed, 155 insertions(+), 2 deletions(-) create mode 100644 changelog.d/disk_buffer_panic_if_corrupted_file.fix.md diff --git a/changelog.d/disk_buffer_panic_if_corrupted_file.fix.md b/changelog.d/disk_buffer_panic_if_corrupted_file.fix.md new file mode 100644 index 0000000000000..40db302a542c8 --- /dev/null +++ b/changelog.d/disk_buffer_panic_if_corrupted_file.fix.md @@ -0,0 +1,4 @@ +fix panics in disk buffer when both reader and writer are on the at last data file and last data file is corrupted. +which are generally the case when node shutdown improperly. + +authors: anil-db diff --git a/lib/vector-buffers/src/variants/disk_v2/ledger.rs b/lib/vector-buffers/src/variants/disk_v2/ledger.rs index 7f56f30b343d5..d22accf5db81a 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ledger.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ledger.rs @@ -314,7 +314,6 @@ where /// /// This is purely a future-looking operation i.e. what would the file ID be if it was /// incremented from its current value. It does not alter the current writer file ID. - #[cfg(test)] pub fn get_next_writer_file_id(&self) -> u16 { self.state().get_next_writer_file_id() } diff --git a/lib/vector-buffers/src/variants/disk_v2/reader.rs b/lib/vector-buffers/src/variants/disk_v2/reader.rs index a584af5d38d6f..d3833fb9b2612 100644 --- a/lib/vector-buffers/src/variants/disk_v2/reader.rs +++ b/lib/vector-buffers/src/variants/disk_v2/reader.rs @@ -764,7 +764,10 @@ where Ok(data_file) => data_file, Err(e) => match e.kind() { ErrorKind::NotFound => { - if reader_file_id == writer_file_id { + // reader is either waiting for writer to create the file which can be current writer_file_id or next writer_file_id (if writer has marked for skip) + if reader_file_id == writer_file_id + || reader_file_id == self.ledger.get_next_writer_file_id() + { debug!( data_file_path = data_file_path.to_string_lossy().as_ref(), "Data file does not yet exist. Waiting for writer to create." diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs index f613a14373c72..7fb7bc57af829 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs @@ -1,5 +1,6 @@ use std::{ io::{self, SeekFrom}, + ops::Add, sync::atomic::{AtomicU32, Ordering}, }; @@ -8,6 +9,7 @@ use memmap2::MmapMut; use tokio::{ fs::OpenOptions, io::{AsyncSeekExt, AsyncWriteExt}, + time::{timeout, Duration}, }; use tracing::Instrument; use vector_common::byte_size_of::ByteSizeOf; @@ -817,3 +819,148 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() { }) .await; } + +#[tokio::test] +async fn writer_and_reader_handle_when_last_record_has_scrambled_archive_data() { + let assertion_registry = install_tracing_helpers(); + let fut = with_temp_dir(|dir| { + let data_dir = dir.to_path_buf(); + + async move { + let marked_for_skip = assertion_registry + .build() + .with_name("mark_for_skip") + .with_parent_name("writer_detects_when_last_record_has_scrambled_archive_data") + .was_entered() + .finalize(); + + // Create a regular buffer, no customizations required. + let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await; + let starting_writer_file_id = ledger.get_current_writer_file_id(); + let expected_final_writer_file_id = ledger.get_next_writer_file_id(); + let expected_final_write_data_file = ledger.get_next_writer_data_file_path(); + assert_file_does_not_exist_async!(&expected_final_write_data_file); + + // Write a `SizedRecord` record that we can scramble. Since it will be the last record + // in the data file, the writer should detect this error when the buffer is recreated, + // even though it doesn't actually _emit_ anything we can observe when creating the + // buffer... but it should trigger a call to `reset`, which we _can_ observe with + // tracing assertions. + let bytes_written_1 = writer + .write_record(SizedRecord::new(64)) + .await + .expect("write should not fail"); + let bytes_written_2 = writer + .write_record(SizedRecord::new(68)) + .await + .expect("write should not fail"); + writer.flush().await.expect("flush should not fail"); + writer.close(); + let expected_data_file_len = bytes_written_1.add(bytes_written_2) as u64; + + let first_read = reader + .next() + .await + .expect("should not fail to read record") + .expect("should contain first record"); + assert_eq!(SizedRecord::new(64), first_read); + acknowledge(first_read).await; + + let second_read = reader + .next() + .await + .expect("should not fail to read record") + .expect("should contain first record"); + assert_eq!(SizedRecord::new(68), second_read); + acknowledge(second_read).await; + + let third_read = reader.next().await.expect("should not fail to read record"); + assert!(third_read.is_none()); + + ledger.flush().expect("should not fail to flush ledger"); + + // Grab the current writer data file path, and then drop the writer/reader. Once the + // buffer is closed, we'll purposefully scramble the archived data -- but not the length + // delimiter -- which should trigger `rkyv` to throw an error when we check the data. + let data_file_path = ledger.get_current_writer_data_file_path(); + drop(writer); + drop(reader); + drop(ledger); + + // We should not have seen a call to `mark_for_skip` yet. + assert!(!marked_for_skip.try_assert()); + + // Open the file and set the last eight bytes of the record to something clearly + // wrong/invalid, which should end up messing with the relative pointer stuff in the + // archive. + let mut data_file = OpenOptions::new() + .write(true) + .open(&data_file_path) + .await + .expect("open should not fail"); + + // Just to make sure the data file matches our expected state before futzing with it. + let metadata = data_file + .metadata() + .await + .expect("metadata should not fail"); + assert_eq!(expected_data_file_len, metadata.len()); + + let target_pos = expected_data_file_len - 8; + let pos = data_file + .seek(SeekFrom::Start(target_pos)) + .await + .expect("seek should not fail"); + assert_eq!(target_pos, pos); + data_file + .write_all(&[0xd, 0xe, 0xa, 0xd, 0xb, 0xe, 0xe, 0xf]) + .await + .expect("write should not fail"); + data_file.flush().await.expect("flush should not fail"); + data_file.sync_all().await.expect("sync should not fail"); + drop(data_file); + + // Now reopen the buffer, which should trigger a `Writer::mark_for_skip` call which + // instructs the writer to skip to the next data file, although this doesn't happen + // until the first write is attempted. + let (mut writer, mut reader, ledger) = + create_default_buffer_v2::<_, SizedRecord>(data_dir).await; + marked_for_skip.assert(); + // When writer see last record as corrupted set flag to skip to next file but reader moves to next file id and wait for writer to create it. + assert_reader_writer_v2_file_positions!( + ledger, + expected_final_writer_file_id, + starting_writer_file_id + ); + assert_file_does_not_exist_async!(&expected_final_write_data_file); + + // At this point reader is waiting for writer to create next data file, so we can test that reader.next() times out. + let result = timeout(Duration::from_millis(100), reader.next()).await; + assert!(result.is_err(), "expected reader.next() to time out"); + + // Do a simple write to ensure it opens the next data file. + let _bytes_written = writer + .write_record(SizedRecord::new(72)) + .await + .expect("write should not fail"); + writer.flush().await.expect("flush should not fail"); + assert_reader_writer_v2_file_positions!( + ledger, + expected_final_writer_file_id, + expected_final_writer_file_id + ); + assert_file_exists_async!(&expected_final_write_data_file); + + let read = reader + .next() + .await + .expect("should not fail to read record") + .expect("should contain first record"); + assert_eq!(SizedRecord::new(72), read); + acknowledge(read).await; + } + }); + + let parent = trace_span!("writer_detects_when_last_record_has_scrambled_archive_data"); + fut.instrument(parent.or_current()).await; +} From 6a1dc9a7a3fbc740c7e42f46ac483cdca80b04fb Mon Sep 17 00:00:00 2001 From: Thomas Date: Wed, 10 Sep 2025 11:31:46 -0400 Subject: [PATCH 2/5] Allow clippy too many lines in test --- lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs index 7fb7bc57af829..a6d63c56ffa14 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs @@ -820,6 +820,7 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() { .await; } +#[allow(clippy::too_many_lines)] #[tokio::test] async fn writer_and_reader_handle_when_last_record_has_scrambled_archive_data() { let assertion_registry = install_tracing_helpers(); From d053155f89cb669c7527a8e04e3f57f1651306be Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 16 Sep 2025 11:43:52 -0400 Subject: [PATCH 3/5] cargo fmt --- lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs index d57225d4d8a69..809b8092351e7 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs @@ -9,7 +9,7 @@ use memmap2::MmapMut; use tokio::{ fs::OpenOptions, io::{AsyncSeekExt, AsyncWriteExt}, - time::{timeout, Duration}, + time::{Duration, timeout}, }; use tracing::Instrument; use vector_common::{ From 73b3326d646963a18bdc7917792d1bcc1b4de609 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 16 Sep 2025 12:09:47 -0400 Subject: [PATCH 4/5] simplify test --- .../variants/disk_v2/tests/known_errors.rs | 148 ++++++++++-------- 1 file changed, 84 insertions(+), 64 deletions(-) diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs index 809b8092351e7..9f5ec073c00a7 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs @@ -1,17 +1,17 @@ +use bytes::{Buf, BufMut}; +use memmap2::MmapMut; use std::{ io::{self, SeekFrom}, - ops::Add, + path::PathBuf, sync::atomic::{AtomicU32, Ordering}, }; - -use bytes::{Buf, BufMut}; -use memmap2::MmapMut; use tokio::{ fs::OpenOptions, io::{AsyncSeekExt, AsyncWriteExt}, time::{Duration, timeout}, }; use tracing::Instrument; +use tracing_fluent_assertions::{Assertion, AssertionRegistry}; use vector_common::{ byte_size_of::ByteSizeOf, finalization::{AddBatchNotifier, BatchNotifier}, @@ -817,7 +817,77 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() { .await; } -#[allow(clippy::too_many_lines)] +struct ScrambledTestSetup { + marked_for_skip: Assertion, + data_file_path: PathBuf, + starting_writer_file_id: u16, + expected_final_writer_file_id: u16, + expected_final_write_data_file: PathBuf, + expected_data_file_len: u64, +} + +async fn write_two_records_and_read_all_then_drop( + data_dir: PathBuf, + assertion_registry: &AssertionRegistry, +) -> ScrambledTestSetup { + let marked_for_skip = assertion_registry + .build() + .with_name("mark_for_skip") + .with_parent_name("writer_detects_when_last_record_has_scrambled_archive_data") + .was_entered() + .finalize(); + + let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await; + + let starting_writer_file_id = ledger.get_current_writer_file_id(); + let expected_final_writer_file_id = ledger.get_next_writer_file_id(); + let expected_final_write_data_file = ledger.get_next_writer_data_file_path(); + assert_file_does_not_exist_async!(&expected_final_write_data_file); + + let bytes_written_1 = writer + .write_record(SizedRecord::new(64)) + .await + .expect("write failed"); + let bytes_written_2 = writer + .write_record(SizedRecord::new(68)) + .await + .expect("write failed"); + writer.flush().await.expect("flush failed"); + writer.close(); + + let expected_data_file_len = bytes_written_1 + bytes_written_2; + + let first_read = reader + .next() + .await + .expect("read failed") + .expect("missing record"); + assert_eq!(SizedRecord::new(64), first_read); + acknowledge(first_read).await; + + let second_read = reader + .next() + .await + .expect("read failed") + .expect("missing record"); + assert_eq!(SizedRecord::new(68), second_read); + acknowledge(second_read).await; + + let third_read = reader.next().await.expect("read failed"); + assert!(third_read.is_none()); + + ledger.flush().expect("flush failed"); + + ScrambledTestSetup { + marked_for_skip, + data_file_path: ledger.get_current_writer_data_file_path(), + starting_writer_file_id, + expected_final_writer_file_id, + expected_final_write_data_file, + expected_data_file_len: expected_data_file_len as u64, + } +} + #[tokio::test] async fn writer_and_reader_handle_when_last_record_has_scrambled_archive_data() { let assertion_registry = install_tracing_helpers(); @@ -825,65 +895,15 @@ async fn writer_and_reader_handle_when_last_record_has_scrambled_archive_data() let data_dir = dir.to_path_buf(); async move { - let marked_for_skip = assertion_registry - .build() - .with_name("mark_for_skip") - .with_parent_name("writer_detects_when_last_record_has_scrambled_archive_data") - .was_entered() - .finalize(); - - // Create a regular buffer, no customizations required. - let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await; - let starting_writer_file_id = ledger.get_current_writer_file_id(); - let expected_final_writer_file_id = ledger.get_next_writer_file_id(); - let expected_final_write_data_file = ledger.get_next_writer_data_file_path(); - assert_file_does_not_exist_async!(&expected_final_write_data_file); - - // Write a `SizedRecord` record that we can scramble. Since it will be the last record - // in the data file, the writer should detect this error when the buffer is recreated, - // even though it doesn't actually _emit_ anything we can observe when creating the - // buffer... but it should trigger a call to `reset`, which we _can_ observe with - // tracing assertions. - let bytes_written_1 = writer - .write_record(SizedRecord::new(64)) - .await - .expect("write should not fail"); - let bytes_written_2 = writer - .write_record(SizedRecord::new(68)) - .await - .expect("write should not fail"); - writer.flush().await.expect("flush should not fail"); - writer.close(); - let expected_data_file_len = bytes_written_1.add(bytes_written_2) as u64; - - let first_read = reader - .next() - .await - .expect("should not fail to read record") - .expect("should contain first record"); - assert_eq!(SizedRecord::new(64), first_read); - acknowledge(first_read).await; - - let second_read = reader - .next() - .await - .expect("should not fail to read record") - .expect("should contain first record"); - assert_eq!(SizedRecord::new(68), second_read); - acknowledge(second_read).await; - - let third_read = reader.next().await.expect("should not fail to read record"); - assert!(third_read.is_none()); - - ledger.flush().expect("should not fail to flush ledger"); - - // Grab the current writer data file path, and then drop the writer/reader. Once the - // buffer is closed, we'll purposefully scramble the archived data -- but not the length - // delimiter -- which should trigger `rkyv` to throw an error when we check the data. - let data_file_path = ledger.get_current_writer_data_file_path(); - drop(writer); - drop(reader); - drop(ledger); + let ScrambledTestSetup { + marked_for_skip, + data_file_path, + starting_writer_file_id, + expected_final_writer_file_id, + expected_final_write_data_file, + expected_data_file_len, + } = write_two_records_and_read_all_then_drop(data_dir.clone(), &assertion_registry) + .await; // We should not have seen a call to `mark_for_skip` yet. assert!(!marked_for_skip.try_assert()); From 6fa68582daee0fc0140da33dc8cd69142b2ffebb Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 16 Sep 2025 12:24:15 -0400 Subject: [PATCH 5/5] Update changelog.d/disk_buffer_panic_if_corrupted_file.fix.md --- changelog.d/disk_buffer_panic_if_corrupted_file.fix.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/changelog.d/disk_buffer_panic_if_corrupted_file.fix.md b/changelog.d/disk_buffer_panic_if_corrupted_file.fix.md index 40db302a542c8..9e586836e2b80 100644 --- a/changelog.d/disk_buffer_panic_if_corrupted_file.fix.md +++ b/changelog.d/disk_buffer_panic_if_corrupted_file.fix.md @@ -1,4 +1,3 @@ -fix panics in disk buffer when both reader and writer are on the at last data file and last data file is corrupted. -which are generally the case when node shutdown improperly. +Fix disk buffer panics when both reader and writer are on the last data file and it is corrupted. This scenario typically occurs when a node shuts down improperly, leaving the final data file in a corrupted state. authors: anil-db