Skip to content

Commit a0c8f00

Browse files
anil-dbthomasqueirozbpront
authored andcommitted
fix(buffers): fix panic in disk buffer when dealing with corrupted file (vectordotdev#23617)
--------- Co-authored-by: Thomas <[email protected]> Co-authored-by: Pavlos Rontidis <[email protected]>
1 parent c66c35d commit a0c8f00

File tree

4 files changed

+195
-25
lines changed

4 files changed

+195
-25
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fix disk buffer panics when both reader and writer are on the last data file and it is corrupted. This scenario typically occurs when a node shuts down improperly, leaving the final data file in a corrupted state.
2+
3+
authors: anil-db

lib/vector-buffers/src/variants/disk_v2/ledger.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ where
314314
///
315315
/// This is purely a future-looking operation i.e. what would the file ID be if it was
316316
/// incremented from its current value. It does not alter the current writer file ID.
317-
#[cfg(test)]
318317
pub fn get_next_writer_file_id(&self) -> u16 {
319318
self.state().get_next_writer_file_id()
320319
}

lib/vector-buffers/src/variants/disk_v2/reader.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,10 @@ where
764764
Ok(data_file) => data_file,
765765
Err(e) => match e.kind() {
766766
ErrorKind::NotFound => {
767-
if reader_file_id == writer_file_id {
767+
// reader is either waiting for writer to create the file which can be current writer_file_id or next writer_file_id (if writer has marked for skip)
768+
if reader_file_id == writer_file_id
769+
|| reader_file_id == self.ledger.get_next_writer_file_id()
770+
{
768771
debug!(
769772
data_file_path = data_file_path.to_string_lossy().as_ref(),
770773
"Data file does not yet exist. Waiting for writer to create."

lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs

Lines changed: 188 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,41 @@
1+
use bytes::{Buf, BufMut};
2+
use memmap2::MmapMut;
13
use std::{
24
io::{self, SeekFrom},
5+
path::PathBuf,
36
sync::atomic::{AtomicU32, Ordering},
47
};
5-
6-
use bytes::{Buf, BufMut};
7-
use memmap2::MmapMut;
88
use tokio::{
99
fs::OpenOptions,
1010
io::{AsyncSeekExt, AsyncWriteExt},
11+
time::{Duration, timeout},
1112
};
1213
use tracing::Instrument;
13-
use vector_common::byte_size_of::ByteSizeOf;
14-
use vector_common::finalization::{AddBatchNotifier, BatchNotifier};
14+
use tracing_fluent_assertions::{Assertion, AssertionRegistry};
15+
use vector_common::{
16+
byte_size_of::ByteSizeOf,
17+
finalization::{AddBatchNotifier, BatchNotifier},
18+
};
1519

1620
use super::{create_buffer_v2_with_max_data_file_size, create_default_buffer_v2};
1721
use crate::{
18-
assert_buffer_size, assert_enough_bytes_written, assert_file_does_not_exist_async,
22+
EventCount, assert_buffer_size, assert_enough_bytes_written, assert_file_does_not_exist_async,
1923
assert_file_exists_async, assert_reader_writer_v2_file_positions, await_timeout,
2024
encoding::{AsMetadata, Encodable},
21-
test::{acknowledge, install_tracing_helpers, with_temp_dir, SizedRecord, UndecodableRecord},
22-
variants::disk_v2::{backed_archive::BackedArchive, record::Record, ReaderError},
23-
EventCount,
25+
test::{SizedRecord, UndecodableRecord, acknowledge, install_tracing_helpers, with_temp_dir},
26+
variants::disk_v2::{ReaderError, backed_archive::BackedArchive, record::Record},
2427
};
2528

29+
impl AsMetadata for u32 {
30+
fn into_u32(self) -> u32 {
31+
self
32+
}
33+
34+
fn from_u32(value: u32) -> Option<Self> {
35+
if value < 32 { Some(value) } else { None }
36+
}
37+
}
38+
2639
#[tokio::test]
2740
async fn reader_throws_error_when_record_length_delimiter_is_zero() {
2841
with_temp_dir(|dir| {
@@ -686,20 +699,6 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() {
686699
static GET_METADATA_VALUE: AtomicU32 = AtomicU32::new(0);
687700
static CAN_DECODE_VALUE: AtomicU32 = AtomicU32::new(0);
688701

689-
impl AsMetadata for u32 {
690-
fn into_u32(self) -> u32 {
691-
self
692-
}
693-
694-
fn from_u32(value: u32) -> Option<Self> {
695-
if value < 32 {
696-
Some(value)
697-
} else {
698-
None
699-
}
700-
}
701-
}
702-
703702
#[derive(Debug)]
704703
struct ControllableRecord(u8);
705704

@@ -817,3 +816,169 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() {
817816
})
818817
.await;
819818
}
819+
820+
struct ScrambledTestSetup {
821+
marked_for_skip: Assertion,
822+
data_file_path: PathBuf,
823+
starting_writer_file_id: u16,
824+
expected_final_writer_file_id: u16,
825+
expected_final_write_data_file: PathBuf,
826+
expected_data_file_len: u64,
827+
}
828+
829+
async fn write_two_records_and_read_all_then_drop(
830+
data_dir: PathBuf,
831+
assertion_registry: &AssertionRegistry,
832+
) -> ScrambledTestSetup {
833+
let marked_for_skip = assertion_registry
834+
.build()
835+
.with_name("mark_for_skip")
836+
.with_parent_name("writer_detects_when_last_record_has_scrambled_archive_data")
837+
.was_entered()
838+
.finalize();
839+
840+
let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await;
841+
842+
let starting_writer_file_id = ledger.get_current_writer_file_id();
843+
let expected_final_writer_file_id = ledger.get_next_writer_file_id();
844+
let expected_final_write_data_file = ledger.get_next_writer_data_file_path();
845+
assert_file_does_not_exist_async!(&expected_final_write_data_file);
846+
847+
let bytes_written_1 = writer
848+
.write_record(SizedRecord::new(64))
849+
.await
850+
.expect("write failed");
851+
let bytes_written_2 = writer
852+
.write_record(SizedRecord::new(68))
853+
.await
854+
.expect("write failed");
855+
writer.flush().await.expect("flush failed");
856+
writer.close();
857+
858+
let expected_data_file_len = bytes_written_1 + bytes_written_2;
859+
860+
let first_read = reader
861+
.next()
862+
.await
863+
.expect("read failed")
864+
.expect("missing record");
865+
assert_eq!(SizedRecord::new(64), first_read);
866+
acknowledge(first_read).await;
867+
868+
let second_read = reader
869+
.next()
870+
.await
871+
.expect("read failed")
872+
.expect("missing record");
873+
assert_eq!(SizedRecord::new(68), second_read);
874+
acknowledge(second_read).await;
875+
876+
let third_read = reader.next().await.expect("read failed");
877+
assert!(third_read.is_none());
878+
879+
ledger.flush().expect("flush failed");
880+
881+
ScrambledTestSetup {
882+
marked_for_skip,
883+
data_file_path: ledger.get_current_writer_data_file_path(),
884+
starting_writer_file_id,
885+
expected_final_writer_file_id,
886+
expected_final_write_data_file,
887+
expected_data_file_len: expected_data_file_len as u64,
888+
}
889+
}
890+
891+
#[tokio::test]
892+
async fn writer_and_reader_handle_when_last_record_has_scrambled_archive_data() {
893+
let assertion_registry = install_tracing_helpers();
894+
let fut = with_temp_dir(|dir| {
895+
let data_dir = dir.to_path_buf();
896+
897+
async move {
898+
let ScrambledTestSetup {
899+
marked_for_skip,
900+
data_file_path,
901+
starting_writer_file_id,
902+
expected_final_writer_file_id,
903+
expected_final_write_data_file,
904+
expected_data_file_len,
905+
} = write_two_records_and_read_all_then_drop(data_dir.clone(), &assertion_registry)
906+
.await;
907+
908+
// We should not have seen a call to `mark_for_skip` yet.
909+
assert!(!marked_for_skip.try_assert());
910+
911+
// Open the file and set the last eight bytes of the record to something clearly
912+
// wrong/invalid, which should end up messing with the relative pointer stuff in the
913+
// archive.
914+
let mut data_file = OpenOptions::new()
915+
.write(true)
916+
.open(&data_file_path)
917+
.await
918+
.expect("open should not fail");
919+
920+
// Just to make sure the data file matches our expected state before futzing with it.
921+
let metadata = data_file
922+
.metadata()
923+
.await
924+
.expect("metadata should not fail");
925+
assert_eq!(expected_data_file_len, metadata.len());
926+
927+
let target_pos = expected_data_file_len - 8;
928+
let pos = data_file
929+
.seek(SeekFrom::Start(target_pos))
930+
.await
931+
.expect("seek should not fail");
932+
assert_eq!(target_pos, pos);
933+
data_file
934+
.write_all(&[0xd, 0xe, 0xa, 0xd, 0xb, 0xe, 0xe, 0xf])
935+
.await
936+
.expect("write should not fail");
937+
data_file.flush().await.expect("flush should not fail");
938+
data_file.sync_all().await.expect("sync should not fail");
939+
drop(data_file);
940+
941+
// Now reopen the buffer, which should trigger a `Writer::mark_for_skip` call which
942+
// instructs the writer to skip to the next data file, although this doesn't happen
943+
// until the first write is attempted.
944+
let (mut writer, mut reader, ledger) =
945+
create_default_buffer_v2::<_, SizedRecord>(data_dir).await;
946+
marked_for_skip.assert();
947+
// When writer see last record as corrupted set flag to skip to next file but reader moves to next file id and wait for writer to create it.
948+
assert_reader_writer_v2_file_positions!(
949+
ledger,
950+
expected_final_writer_file_id,
951+
starting_writer_file_id
952+
);
953+
assert_file_does_not_exist_async!(&expected_final_write_data_file);
954+
955+
// At this point reader is waiting for writer to create next data file, so we can test that reader.next() times out.
956+
let result = timeout(Duration::from_millis(100), reader.next()).await;
957+
assert!(result.is_err(), "expected reader.next() to time out");
958+
959+
// Do a simple write to ensure it opens the next data file.
960+
let _bytes_written = writer
961+
.write_record(SizedRecord::new(72))
962+
.await
963+
.expect("write should not fail");
964+
writer.flush().await.expect("flush should not fail");
965+
assert_reader_writer_v2_file_positions!(
966+
ledger,
967+
expected_final_writer_file_id,
968+
expected_final_writer_file_id
969+
);
970+
assert_file_exists_async!(&expected_final_write_data_file);
971+
972+
let read = reader
973+
.next()
974+
.await
975+
.expect("should not fail to read record")
976+
.expect("should contain first record");
977+
assert_eq!(SizedRecord::new(72), read);
978+
acknowledge(read).await;
979+
}
980+
});
981+
982+
let parent = trace_span!("writer_detects_when_last_record_has_scrambled_archive_data");
983+
fut.instrument(parent.or_current()).await;
984+
}

0 commit comments

Comments
 (0)