Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 239 additions & 1 deletion rust/segment/src/blockfile_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,20 @@ impl<'me> MetadataSegmentWriter<'me> {
if segment.r#type != SegmentType::BlockfileMetadata {
return Err(MetadataSegmentError::InvalidSegmentType);
}
let prefix_path = segment.construct_prefix_path(tenant, database_id);
// NOTE: We hope that all blockfiles of the same collection should live under the same prefix.
// The implementation below implies all collections in the fork tree share the same prefix for
// blockfiles. Although this is not a desired behavior, as a temporary fix we create the sparse
// vector index blockfiles under the same prefix as other blockfiles if they are present.
let prefix_path =
if let Some(existing_file_path) = segment.file_path.values().flatten().next() {
let (existing_prefix, _) = Segment::extract_prefix_and_id(existing_file_path)
.map_err(|_| {
MetadataSegmentError::UuidParseError(existing_file_path.to_string())
})?;
existing_prefix.to_string()
} else {
segment.construct_prefix_path(tenant, database_id)
};
let pls_writer = match segment.file_path.get(FULL_TEXT_PLS) {
Some(pls_paths) => match pls_paths.first() {
Some(pls_path) => {
Expand Down Expand Up @@ -2653,4 +2666,229 @@ mod test {
);
}
}

#[tokio::test]
async fn test_sparse_index_recreated_with_existing_prefix() {
// This test verifies that when sparse index files are missing (e.g., deleted)
// and need to be recreated, they use the same prefix as existing blockfiles
// This tests the bug fix for incorrect blockfile paths

let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let block_cache = new_cache_for_test();
let sparse_index_cache = new_cache_for_test();
let arrow_blockfile_provider = ArrowBlockfileProvider::new(
storage,
TEST_MAX_BLOCK_SIZE_BYTES,
block_cache,
sparse_index_cache,
BlockManagerConfig::default_num_concurrent_block_flushes(),
);
let blockfile_provider =
BlockfileProvider::ArrowBlockfileProvider(arrow_blockfile_provider);

let tenant = String::from("test_tenant");
let database_id = DatabaseUuid::new();

// Original collection ID
let original_collection_id =
CollectionUuid::from_str("00000000-0000-0000-0000-000000000001").expect("parse error");

let mut metadata_segment = chroma_types::Segment {
id: SegmentUuid::from_str("00000000-0000-0000-0000-000000000002").expect("parse error"),
r#type: chroma_types::SegmentType::BlockfileMetadata,
scope: chroma_types::SegmentScope::METADATA,
collection: original_collection_id,
metadata: None,
file_path: HashMap::new(),
};

// First flush: create initial blockfiles
{
let metadata_writer = MetadataSegmentWriter::from_segment(
&tenant,
&database_id,
&metadata_segment,
&blockfile_provider,
)
.await
.expect("Error creating metadata writer");

let metadata_flusher = metadata_writer
.commit()
.await
.expect("Error committing metadata");

metadata_segment.file_path = metadata_flusher
.flush()
.await
.expect("Error flushing metadata");
}

// Verify sparse index files were created
assert!(metadata_segment.file_path.contains_key(SPARSE_MAX));
assert!(metadata_segment.file_path.contains_key(SPARSE_OFFSET_VALUE));

// Extract the original prefix
let original_prefix = {
let existing_file_path = metadata_segment
.file_path
.values()
.next()
.and_then(|paths| paths.first())
.expect("Should have at least one blockfile");

let (prefix, _) = chroma_types::Segment::extract_prefix_and_id(existing_file_path)
.expect("Should be able to extract prefix");
prefix.to_string()
};

// Simulate missing sparse index files (e.g., from older version or deleted)
metadata_segment.file_path.remove(SPARSE_MAX);
metadata_segment.file_path.remove(SPARSE_OFFSET_VALUE);

// Change collection ID to simulate a forked collection
let forked_collection_id =
CollectionUuid::from_str("00000000-0000-0000-0000-000000000003").expect("parse error");
metadata_segment.collection = forked_collection_id;

// Second flush: recreate sparse index files
// The bug fix ensures they use the existing prefix, not a new one
{
let metadata_writer = MetadataSegmentWriter::from_segment(
&tenant,
&database_id,
&metadata_segment,
&blockfile_provider,
)
.await
.expect("Error creating metadata writer");

let metadata_flusher = metadata_writer
.commit()
.await
.expect("Error committing metadata");

metadata_segment.file_path = metadata_flusher
.flush()
.await
.expect("Error flushing metadata");
}

// Verify sparse index files were recreated
assert!(
metadata_segment.file_path.contains_key(SPARSE_MAX),
"Sparse max should be recreated"
);
assert!(
metadata_segment.file_path.contains_key(SPARSE_OFFSET_VALUE),
"Sparse offset value should be recreated"
);

// Verify ALL blockfiles use the original prefix
for (key, paths) in &metadata_segment.file_path {
for path in paths {
let (prefix, _) = chroma_types::Segment::extract_prefix_and_id(path)
.expect("Should be able to extract prefix");
assert_eq!(
prefix, original_prefix,
"All blockfiles should use original prefix. Key: {}, Path: {}",
key, path
);
// Verify the prefix contains the original collection ID, not the forked one
assert!(
prefix.contains(&original_collection_id.to_string()),
"Prefix should contain original collection ID"
);
assert!(
!prefix.contains(&forked_collection_id.to_string()),
"Prefix should NOT contain forked collection ID"
);
}
}

// Verify we can read from the segment with recreated sparse indices
{
let metadata_reader =
MetadataSegmentReader::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Should be able to read from segment with recreated sparse indices");

assert!(
metadata_reader.sparse_index_reader.is_some(),
"Sparse index reader should be created, verifying files exist and are readable"
);
}
// Simulate legacy files without prefix
metadata_segment.file_path.drain();
metadata_segment.file_path.insert(
"legacy_file".to_string(),
vec!["11111111-1111-1111-1111-111111111111".to_string()],
);

// Change collection ID to simulate a forked collection
let forked_collection_id =
CollectionUuid::from_str("00000000-0000-0000-0000-000000000004").expect("parse error");
metadata_segment.collection = forked_collection_id;

// Third flush: recreate all index files
// The bug fix ensures they use the existing prefix, not a new one
{
let metadata_writer = MetadataSegmentWriter::from_segment(
&tenant,
&database_id,
&metadata_segment,
&blockfile_provider,
)
.await
.expect("Error creating metadata writer");

let metadata_flusher = metadata_writer
.commit()
.await
.expect("Error committing metadata");

metadata_segment.file_path = metadata_flusher
.flush()
.await
.expect("Error flushing metadata");
}

// Verify sparse index files were recreated
assert!(
metadata_segment.file_path.contains_key(SPARSE_MAX),
"Sparse max should be recreated"
);
assert!(
metadata_segment.file_path.contains_key(SPARSE_OFFSET_VALUE),
"Sparse offset value should be recreated"
);

// Verify ALL blockfiles use the original prefix
for (key, paths) in &metadata_segment.file_path {
for path in paths {
let (prefix, _) = chroma_types::Segment::extract_prefix_and_id(path)
.expect("Should be able to extract prefix");
assert!(
prefix.is_empty(),
"All blockfiles should use empty prefix. Key: {}, Path: {}",
key,
path
);
}
}

// Verify we can read from the segment with recreated sparse indices
{
let metadata_reader =
MetadataSegmentReader::from_segment(&metadata_segment, &blockfile_provider)
.await
.expect("Should be able to read from segment with recreated sparse indices");

assert!(
metadata_reader.sparse_index_reader.is_some(),
"Sparse index reader should be created, verifying files exist and are readable"
);
}
}
}
Loading