Skip to content

Commit 3c752a1

Browse files
anil-dbthomasqueirozbpront
authored andcommitted
fix(buffers): fix panic in disk buffer when dealing with corrupted file (vectordotdev#23617)
* fix panic in disk buffer when dealing with corrupted file * Allow clippy too many lines in test * cargo fmt * simplify test * Update changelog.d/disk_buffer_panic_if_corrupted_file.fix.md --------- Co-authored-by: Thomas <[email protected]> Co-authored-by: Pavlos Rontidis <[email protected]>
1 parent bba8cd0 commit 3c752a1

File tree

4 files changed

+182
-7
lines changed

4 files changed

+182
-7
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fix disk buffer panics when both reader and writer are on the last data file and it is corrupted. This scenario typically occurs when a node shuts down improperly, leaving the final data file in a corrupted state.
2+
3+
authors: anil-db

lib/vector-buffers/src/variants/disk_v2/ledger.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ where
314314
///
315315
/// This is purely a future-looking operation i.e. what would the file ID be if it was
316316
/// incremented from its current value. It does not alter the current writer file ID.
317-
#[cfg(test)]
318317
pub fn get_next_writer_file_id(&self) -> u16 {
319318
self.state().get_next_writer_file_id()
320319
}

lib/vector-buffers/src/variants/disk_v2/reader.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,10 @@ where
764764
Ok(data_file) => data_file,
765765
Err(e) => match e.kind() {
766766
ErrorKind::NotFound => {
767-
if reader_file_id == writer_file_id {
767+
// reader is either waiting for writer to create the file which can be current writer_file_id or next writer_file_id (if writer has marked for skip)
768+
if reader_file_id == writer_file_id
769+
|| reader_file_id == self.ledger.get_next_writer_file_id()
770+
{
768771
debug!(
769772
data_file_path = data_file_path.to_string_lossy().as_ref(),
770773
"Data file does not yet exist. Waiting for writer to create."

lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs

Lines changed: 175 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
1+
use bytes::{Buf, BufMut};
2+
use memmap2::MmapMut;
13
use std::{
24
io::{self, SeekFrom},
5+
path::PathBuf,
36
sync::atomic::{AtomicU32, Ordering},
47
};
5-
6-
use bytes::{Buf, BufMut};
7-
use memmap2::MmapMut;
88
use tokio::{
99
fs::OpenOptions,
1010
io::{AsyncSeekExt, AsyncWriteExt},
11+
time::{timeout, Duration},
1112
};
1213
use tracing::Instrument;
13-
use vector_common::byte_size_of::ByteSizeOf;
14-
use vector_common::finalization::{AddBatchNotifier, BatchNotifier};
14+
use tracing_fluent_assertions::{Assertion, AssertionRegistry};
15+
use vector_common::{
16+
byte_size_of::ByteSizeOf,
17+
finalization::{AddBatchNotifier, BatchNotifier},
18+
};
1519

1620
use super::{create_buffer_v2_with_max_data_file_size, create_default_buffer_v2};
1721
use crate::{
@@ -817,3 +821,169 @@ async fn reader_throws_error_when_record_is_undecodable_via_metadata() {
817821
})
818822
.await;
819823
}
824+
825+
struct ScrambledTestSetup {
826+
marked_for_skip: Assertion,
827+
data_file_path: PathBuf,
828+
starting_writer_file_id: u16,
829+
expected_final_writer_file_id: u16,
830+
expected_final_write_data_file: PathBuf,
831+
expected_data_file_len: u64,
832+
}
833+
834+
async fn write_two_records_and_read_all_then_drop(
835+
data_dir: PathBuf,
836+
assertion_registry: &AssertionRegistry,
837+
) -> ScrambledTestSetup {
838+
let marked_for_skip = assertion_registry
839+
.build()
840+
.with_name("mark_for_skip")
841+
.with_parent_name("writer_detects_when_last_record_has_scrambled_archive_data")
842+
.was_entered()
843+
.finalize();
844+
845+
let (mut writer, mut reader, ledger) = create_default_buffer_v2(data_dir.clone()).await;
846+
847+
let starting_writer_file_id = ledger.get_current_writer_file_id();
848+
let expected_final_writer_file_id = ledger.get_next_writer_file_id();
849+
let expected_final_write_data_file = ledger.get_next_writer_data_file_path();
850+
assert_file_does_not_exist_async!(&expected_final_write_data_file);
851+
852+
let bytes_written_1 = writer
853+
.write_record(SizedRecord::new(64))
854+
.await
855+
.expect("write failed");
856+
let bytes_written_2 = writer
857+
.write_record(SizedRecord::new(68))
858+
.await
859+
.expect("write failed");
860+
writer.flush().await.expect("flush failed");
861+
writer.close();
862+
863+
let expected_data_file_len = bytes_written_1 + bytes_written_2;
864+
865+
let first_read = reader
866+
.next()
867+
.await
868+
.expect("read failed")
869+
.expect("missing record");
870+
assert_eq!(SizedRecord::new(64), first_read);
871+
acknowledge(first_read).await;
872+
873+
let second_read = reader
874+
.next()
875+
.await
876+
.expect("read failed")
877+
.expect("missing record");
878+
assert_eq!(SizedRecord::new(68), second_read);
879+
acknowledge(second_read).await;
880+
881+
let third_read = reader.next().await.expect("read failed");
882+
assert!(third_read.is_none());
883+
884+
ledger.flush().expect("flush failed");
885+
886+
ScrambledTestSetup {
887+
marked_for_skip,
888+
data_file_path: ledger.get_current_writer_data_file_path(),
889+
starting_writer_file_id,
890+
expected_final_writer_file_id,
891+
expected_final_write_data_file,
892+
expected_data_file_len: expected_data_file_len as u64,
893+
}
894+
}
895+
896+
#[tokio::test]
897+
async fn writer_and_reader_handle_when_last_record_has_scrambled_archive_data() {
898+
let assertion_registry = install_tracing_helpers();
899+
let fut = with_temp_dir(|dir| {
900+
let data_dir = dir.to_path_buf();
901+
902+
async move {
903+
let ScrambledTestSetup {
904+
marked_for_skip,
905+
data_file_path,
906+
starting_writer_file_id,
907+
expected_final_writer_file_id,
908+
expected_final_write_data_file,
909+
expected_data_file_len,
910+
} = write_two_records_and_read_all_then_drop(data_dir.clone(), &assertion_registry)
911+
.await;
912+
913+
// We should not have seen a call to `mark_for_skip` yet.
914+
assert!(!marked_for_skip.try_assert());
915+
916+
// Open the file and set the last eight bytes of the record to something clearly
917+
// wrong/invalid, which should end up messing with the relative pointer stuff in the
918+
// archive.
919+
let mut data_file = OpenOptions::new()
920+
.write(true)
921+
.open(&data_file_path)
922+
.await
923+
.expect("open should not fail");
924+
925+
// Just to make sure the data file matches our expected state before futzing with it.
926+
let metadata = data_file
927+
.metadata()
928+
.await
929+
.expect("metadata should not fail");
930+
assert_eq!(expected_data_file_len, metadata.len());
931+
932+
let target_pos = expected_data_file_len - 8;
933+
let pos = data_file
934+
.seek(SeekFrom::Start(target_pos))
935+
.await
936+
.expect("seek should not fail");
937+
assert_eq!(target_pos, pos);
938+
data_file
939+
.write_all(&[0xd, 0xe, 0xa, 0xd, 0xb, 0xe, 0xe, 0xf])
940+
.await
941+
.expect("write should not fail");
942+
data_file.flush().await.expect("flush should not fail");
943+
data_file.sync_all().await.expect("sync should not fail");
944+
drop(data_file);
945+
946+
// Now reopen the buffer, which should trigger a `Writer::mark_for_skip` call which
947+
// instructs the writer to skip to the next data file, although this doesn't happen
948+
// until the first write is attempted.
949+
let (mut writer, mut reader, ledger) =
950+
create_default_buffer_v2::<_, SizedRecord>(data_dir).await;
951+
marked_for_skip.assert();
952+
// When writer see last record as corrupted set flag to skip to next file but reader moves to next file id and wait for writer to create it.
953+
assert_reader_writer_v2_file_positions!(
954+
ledger,
955+
expected_final_writer_file_id,
956+
starting_writer_file_id
957+
);
958+
assert_file_does_not_exist_async!(&expected_final_write_data_file);
959+
960+
// At this point reader is waiting for writer to create next data file, so we can test that reader.next() times out.
961+
let result = timeout(Duration::from_millis(100), reader.next()).await;
962+
assert!(result.is_err(), "expected reader.next() to time out");
963+
964+
// Do a simple write to ensure it opens the next data file.
965+
let _bytes_written = writer
966+
.write_record(SizedRecord::new(72))
967+
.await
968+
.expect("write should not fail");
969+
writer.flush().await.expect("flush should not fail");
970+
assert_reader_writer_v2_file_positions!(
971+
ledger,
972+
expected_final_writer_file_id,
973+
expected_final_writer_file_id
974+
);
975+
assert_file_exists_async!(&expected_final_write_data_file);
976+
977+
let read = reader
978+
.next()
979+
.await
980+
.expect("should not fail to read record")
981+
.expect("should contain first record");
982+
assert_eq!(SizedRecord::new(72), read);
983+
acknowledge(read).await;
984+
}
985+
});
986+
987+
let parent = trace_span!("writer_detects_when_last_record_has_scrambled_archive_data");
988+
fut.instrument(parent.or_current()).await;
989+
}

0 commit comments

Comments
 (0)