diff --git a/rust/segment/src/blockfile_metadata.rs b/rust/segment/src/blockfile_metadata.rs index b3ee6f8300b..4b506e731c1 100644 --- a/rust/segment/src/blockfile_metadata.rs +++ b/rust/segment/src/blockfile_metadata.rs @@ -116,7 +116,20 @@ impl<'me> MetadataSegmentWriter<'me> { if segment.r#type != SegmentType::BlockfileMetadata { return Err(MetadataSegmentError::InvalidSegmentType); } - let prefix_path = segment.construct_prefix_path(tenant, database_id); + // NOTE: We hope that all blockfiles of the same collection should live under the same prefix. + // The implementation below implies all collections in the fork tree share the same prefix for + // blockfiles. Although this is not a desired behavior, as a temporary fix we create the sparse + // vector index blockfiles under the same prefix as other blockfiles if they are present. + let prefix_path = + if let Some(existing_file_path) = segment.file_path.values().flatten().next() { + let (existing_prefix, _) = Segment::extract_prefix_and_id(existing_file_path) + .map_err(|_| { + MetadataSegmentError::UuidParseError(existing_file_path.to_string()) + })?; + existing_prefix.to_string() + } else { + segment.construct_prefix_path(tenant, database_id) + }; let pls_writer = match segment.file_path.get(FULL_TEXT_PLS) { Some(pls_paths) => match pls_paths.first() { Some(pls_path) => { @@ -2653,4 +2666,229 @@ mod test { ); } } + + #[tokio::test] + async fn test_sparse_index_recreated_with_existing_prefix() { + // This test verifies that when sparse index files are missing (e.g., deleted) + // and need to be recreated, they use the same prefix as existing blockfiles + // This tests the bug fix for incorrect blockfile paths + + let tmp_dir = tempfile::tempdir().unwrap(); + let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap())); + let block_cache = new_cache_for_test(); + let sparse_index_cache = new_cache_for_test(); + let arrow_blockfile_provider = ArrowBlockfileProvider::new( + storage, + TEST_MAX_BLOCK_SIZE_BYTES, + block_cache, + sparse_index_cache, + BlockManagerConfig::default_num_concurrent_block_flushes(), + ); + let blockfile_provider = + BlockfileProvider::ArrowBlockfileProvider(arrow_blockfile_provider); + + let tenant = String::from("test_tenant"); + let database_id = DatabaseUuid::new(); + + // Original collection ID + let original_collection_id = + CollectionUuid::from_str("00000000-0000-0000-0000-000000000001").expect("parse error"); + + let mut metadata_segment = chroma_types::Segment { + id: SegmentUuid::from_str("00000000-0000-0000-0000-000000000002").expect("parse error"), + r#type: chroma_types::SegmentType::BlockfileMetadata, + scope: chroma_types::SegmentScope::METADATA, + collection: original_collection_id, + metadata: None, + file_path: HashMap::new(), + }; + + // First flush: create initial blockfiles + { + let metadata_writer = MetadataSegmentWriter::from_segment( + &tenant, + &database_id, + &metadata_segment, + &blockfile_provider, + ) + .await + .expect("Error creating metadata writer"); + + let metadata_flusher = metadata_writer + .commit() + .await + .expect("Error committing metadata"); + + metadata_segment.file_path = metadata_flusher + .flush() + .await + .expect("Error flushing metadata"); + } + + // Verify sparse index files were created + assert!(metadata_segment.file_path.contains_key(SPARSE_MAX)); + assert!(metadata_segment.file_path.contains_key(SPARSE_OFFSET_VALUE)); + + // Extract the original prefix + let original_prefix = { + let existing_file_path = metadata_segment + .file_path + .values() + .next() + .and_then(|paths| paths.first()) + .expect("Should have at least one blockfile"); + + let (prefix, _) = chroma_types::Segment::extract_prefix_and_id(existing_file_path) + .expect("Should be able to extract prefix"); + prefix.to_string() + }; + + // Simulate missing sparse index files (e.g., from older version or deleted) + metadata_segment.file_path.remove(SPARSE_MAX); + metadata_segment.file_path.remove(SPARSE_OFFSET_VALUE); + + // Change collection ID to simulate a forked collection + let forked_collection_id = + CollectionUuid::from_str("00000000-0000-0000-0000-000000000003").expect("parse error"); + metadata_segment.collection = forked_collection_id; + + // Second flush: recreate sparse index files + // The bug fix ensures they use the existing prefix, not a new one + { + let metadata_writer = MetadataSegmentWriter::from_segment( + &tenant, + &database_id, + &metadata_segment, + &blockfile_provider, + ) + .await + .expect("Error creating metadata writer"); + + let metadata_flusher = metadata_writer + .commit() + .await + .expect("Error committing metadata"); + + metadata_segment.file_path = metadata_flusher + .flush() + .await + .expect("Error flushing metadata"); + } + + // Verify sparse index files were recreated + assert!( + metadata_segment.file_path.contains_key(SPARSE_MAX), + "Sparse max should be recreated" + ); + assert!( + metadata_segment.file_path.contains_key(SPARSE_OFFSET_VALUE), + "Sparse offset value should be recreated" + ); + + // Verify ALL blockfiles use the original prefix + for (key, paths) in &metadata_segment.file_path { + for path in paths { + let (prefix, _) = chroma_types::Segment::extract_prefix_and_id(path) + .expect("Should be able to extract prefix"); + assert_eq!( + prefix, original_prefix, + "All blockfiles should use original prefix. Key: {}, Path: {}", + key, path + ); + // Verify the prefix contains the original collection ID, not the forked one + assert!( + prefix.contains(&original_collection_id.to_string()), + "Prefix should contain original collection ID" + ); + assert!( + !prefix.contains(&forked_collection_id.to_string()), + "Prefix should NOT contain forked collection ID" + ); + } + } + + // Verify we can read from the segment with recreated sparse indices + { + let metadata_reader = + MetadataSegmentReader::from_segment(&metadata_segment, &blockfile_provider) + .await + .expect("Should be able to read from segment with recreated sparse indices"); + + assert!( + metadata_reader.sparse_index_reader.is_some(), + "Sparse index reader should be created, verifying files exist and are readable" + ); + } + // Simulate legacy files without prefix + metadata_segment.file_path.drain(); + metadata_segment.file_path.insert( + "legacy_file".to_string(), + vec!["11111111-1111-1111-1111-111111111111".to_string()], + ); + + // Change collection ID to simulate a forked collection + let forked_collection_id = + CollectionUuid::from_str("00000000-0000-0000-0000-000000000004").expect("parse error"); + metadata_segment.collection = forked_collection_id; + + // Third flush: recreate all index files + // The bug fix ensures they use the existing prefix, not a new one + { + let metadata_writer = MetadataSegmentWriter::from_segment( + &tenant, + &database_id, + &metadata_segment, + &blockfile_provider, + ) + .await + .expect("Error creating metadata writer"); + + let metadata_flusher = metadata_writer + .commit() + .await + .expect("Error committing metadata"); + + metadata_segment.file_path = metadata_flusher + .flush() + .await + .expect("Error flushing metadata"); + } + + // Verify sparse index files were recreated + assert!( + metadata_segment.file_path.contains_key(SPARSE_MAX), + "Sparse max should be recreated" + ); + assert!( + metadata_segment.file_path.contains_key(SPARSE_OFFSET_VALUE), + "Sparse offset value should be recreated" + ); + + // Verify ALL blockfiles use the original prefix + for (key, paths) in &metadata_segment.file_path { + for path in paths { + let (prefix, _) = chroma_types::Segment::extract_prefix_and_id(path) + .expect("Should be able to extract prefix"); + assert!( + prefix.is_empty(), + "All blockfiles should use empty prefix. Key: {}, Path: {}", + key, + path + ); + } + } + + // Verify we can read from the segment with recreated sparse indices + { + let metadata_reader = + MetadataSegmentReader::from_segment(&metadata_segment, &blockfile_provider) + .await + .expect("Should be able to read from segment with recreated sparse indices"); + + assert!( + metadata_reader.sparse_index_reader.is_some(), + "Sparse index reader should be created, verifying files exist and are readable" + ); + } + } }