Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/disk_buffer_panic_if_corrupted_file.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fix disk buffer panics when both reader and writer are on the last data file and it is corrupted. This scenario typically occurs when a node shuts down improperly, leaving the final data file in a corrupted state.

authors: anil-db
1 change: 0 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ where
///
/// This is purely a future-looking operation i.e. what would the file ID be if it was
/// incremented from its current value. It does not alter the current writer file ID.
#[cfg(test)]
pub fn get_next_writer_file_id(&self) -> u16 {
self.state().get_next_writer_file_id()
}
Expand Down
5 changes: 4 additions & 1 deletion lib/vector-buffers/src/variants/disk_v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,10 @@ where
Ok(data_file) => data_file,
Err(e) => match e.kind() {
ErrorKind::NotFound => {
if reader_file_id == writer_file_id {
// reader is either waiting for writer to create the file which can be current writer_file_id or next writer_file_id (if writer has marked for skip)
if reader_file_id == writer_file_id
|| reader_file_id == self.ledger.get_next_writer_file_id()
{
debug!(
data_file_path = data_file_path.to_string_lossy().as_ref(),
"Data file does not yet exist. Waiting for writer to create."
Expand Down
174 changes: 171 additions & 3 deletions lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use bytes::{Buf, BufMut};
use memmap2::MmapMut;
use std::{
io::{self, SeekFrom},
path::PathBuf,
sync::atomic::{AtomicU32, Ordering},
};

use bytes::{Buf, BufMut};
use memmap2::MmapMut;
use tokio::{
fs::OpenOptions,
io::{AsyncSeekExt, AsyncWriteExt},
time::{Duration, timeout},
};
use tracing::Instrument;
use tracing_fluent_assertions::{Assertion, AssertionRegistry};
use vector_common::{
byte_size_of::ByteSizeOf,
finalization::{AddBatchNotifier, BatchNotifier},
Expand Down Expand Up @@ -814,3 +816,169 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() {
})
.await;
}

struct ScrambledTestSetup {
marked_for_skip: Assertion,
data_file_path: PathBuf,
starting_writer_file_id: u16,
expected_final_writer_file_id: u16,
expected_final_write_data_file: PathBuf,
expected_data_file_len: u64,
}

async fn write_two_records_and_read_all_then_drop(
data_dir: PathBuf,
assertion_registry: &AssertionRegistry,
) -> ScrambledTestSetup {
let marked_for_skip = assertion_registry
.build()
.with_name("mark_for_skip")
.with_parent_name("writer_detects_when_last_record_has_scrambled_archive_data")
.was_entered()
.finalize();

let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await;

let starting_writer_file_id = ledger.get_current_writer_file_id();
let expected_final_writer_file_id = ledger.get_next_writer_file_id();
let expected_final_write_data_file = ledger.get_next_writer_data_file_path();
assert_file_does_not_exist_async!(&expected_final_write_data_file);

let bytes_written_1 = writer
.write_record(SizedRecord::new(64))
.await
.expect("write failed");
let bytes_written_2 = writer
.write_record(SizedRecord::new(68))
.await
.expect("write failed");
writer.flush().await.expect("flush failed");
writer.close();

let expected_data_file_len = bytes_written_1 + bytes_written_2;

let first_read = reader
.next()
.await
.expect("read failed")
.expect("missing record");
assert_eq!(SizedRecord::new(64), first_read);
acknowledge(first_read).await;

let second_read = reader
.next()
.await
.expect("read failed")
.expect("missing record");
assert_eq!(SizedRecord::new(68), second_read);
acknowledge(second_read).await;

let third_read = reader.next().await.expect("read failed");
assert!(third_read.is_none());

ledger.flush().expect("flush failed");

ScrambledTestSetup {
marked_for_skip,
data_file_path: ledger.get_current_writer_data_file_path(),
starting_writer_file_id,
expected_final_writer_file_id,
expected_final_write_data_file,
expected_data_file_len: expected_data_file_len as u64,
}
}

#[tokio::test]
async fn writer_and_reader_handle_when_last_record_has_scrambled_archive_data() {
let assertion_registry = install_tracing_helpers();
let fut = with_temp_dir(|dir| {
let data_dir = dir.to_path_buf();

async move {
let ScrambledTestSetup {
marked_for_skip,
data_file_path,
starting_writer_file_id,
expected_final_writer_file_id,
expected_final_write_data_file,
expected_data_file_len,
} = write_two_records_and_read_all_then_drop(data_dir.clone(), &assertion_registry)
.await;

// We should not have seen a call to `mark_for_skip` yet.
assert!(!marked_for_skip.try_assert());

// Open the file and set the last eight bytes of the record to something clearly
// wrong/invalid, which should end up messing with the relative pointer stuff in the
// archive.
let mut data_file = OpenOptions::new()
.write(true)
.open(&data_file_path)
.await
.expect("open should not fail");

// Just to make sure the data file matches our expected state before futzing with it.
let metadata = data_file
.metadata()
.await
.expect("metadata should not fail");
assert_eq!(expected_data_file_len, metadata.len());

let target_pos = expected_data_file_len - 8;
let pos = data_file
.seek(SeekFrom::Start(target_pos))
.await
.expect("seek should not fail");
assert_eq!(target_pos, pos);
data_file
.write_all(&[0xd, 0xe, 0xa, 0xd, 0xb, 0xe, 0xe, 0xf])
.await
.expect("write should not fail");
data_file.flush().await.expect("flush should not fail");
data_file.sync_all().await.expect("sync should not fail");
drop(data_file);

// Now reopen the buffer, which should trigger a `Writer::mark_for_skip` call which
// instructs the writer to skip to the next data file, although this doesn't happen
// until the first write is attempted.
let (mut writer, mut reader, ledger) =
create_default_buffer_v2::<_, SizedRecord>(data_dir).await;
marked_for_skip.assert();
// When writer see last record as corrupted set flag to skip to next file but reader moves to next file id and wait for writer to create it.
assert_reader_writer_v2_file_positions!(
ledger,
expected_final_writer_file_id,
starting_writer_file_id
);
assert_file_does_not_exist_async!(&expected_final_write_data_file);

// At this point reader is waiting for writer to create next data file, so we can test that reader.next() times out.
let result = timeout(Duration::from_millis(100), reader.next()).await;
assert!(result.is_err(), "expected reader.next() to time out");

// Do a simple write to ensure it opens the next data file.
let _bytes_written = writer
.write_record(SizedRecord::new(72))
.await
.expect("write should not fail");
writer.flush().await.expect("flush should not fail");
assert_reader_writer_v2_file_positions!(
ledger,
expected_final_writer_file_id,
expected_final_writer_file_id
);
assert_file_exists_async!(&expected_final_write_data_file);

let read = reader
.next()
.await
.expect("should not fail to read record")
.expect("should contain first record");
assert_eq!(SizedRecord::new(72), read);
acknowledge(read).await;
}
});

let parent = trace_span!("writer_detects_when_last_record_has_scrambled_archive_data");
fut.instrument(parent.or_current()).await;
}
Loading