@@ -116,7 +116,20 @@ impl<'me> MetadataSegmentWriter<'me> {
116
116
if segment. r#type != SegmentType :: BlockfileMetadata {
117
117
return Err ( MetadataSegmentError :: InvalidSegmentType ) ;
118
118
}
119
- let prefix_path = segment. construct_prefix_path ( tenant, database_id) ;
119
+ // NOTE: We hope that all blockfiles of the same collection should live under the same prefix.
120
+ // The implementation below implies all collections in the fork tree share the same prefix for
121
+ // blockfiles. Although this is not a desired behavior, as a temporary fix we create the sparse
122
+ // vector index blockfiles under the same prefix as other blockfiles if they are present.
123
+ let prefix_path =
124
+ if let Some ( existing_file_path) = segment. file_path . values ( ) . flatten ( ) . next ( ) {
125
+ let ( existing_prefix, _) = Segment :: extract_prefix_and_id ( existing_file_path)
126
+ . map_err ( |_| {
127
+ MetadataSegmentError :: UuidParseError ( existing_file_path. to_string ( ) )
128
+ } ) ?;
129
+ existing_prefix. to_string ( )
130
+ } else {
131
+ segment. construct_prefix_path ( tenant, database_id)
132
+ } ;
120
133
let pls_writer = match segment. file_path . get ( FULL_TEXT_PLS ) {
121
134
Some ( pls_paths) => match pls_paths. first ( ) {
122
135
Some ( pls_path) => {
@@ -2653,4 +2666,229 @@ mod test {
2653
2666
) ;
2654
2667
}
2655
2668
}
2669
+
2670
+ #[ tokio:: test]
2671
+ async fn test_sparse_index_recreated_with_existing_prefix ( ) {
2672
+ // This test verifies that when sparse index files are missing (e.g., deleted)
2673
+ // and need to be recreated, they use the same prefix as existing blockfiles
2674
+ // This tests the bug fix for incorrect blockfile paths
2675
+
2676
+ let tmp_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
2677
+ let storage = Storage :: Local ( LocalStorage :: new ( tmp_dir. path ( ) . to_str ( ) . unwrap ( ) ) ) ;
2678
+ let block_cache = new_cache_for_test ( ) ;
2679
+ let sparse_index_cache = new_cache_for_test ( ) ;
2680
+ let arrow_blockfile_provider = ArrowBlockfileProvider :: new (
2681
+ storage,
2682
+ TEST_MAX_BLOCK_SIZE_BYTES ,
2683
+ block_cache,
2684
+ sparse_index_cache,
2685
+ BlockManagerConfig :: default_num_concurrent_block_flushes ( ) ,
2686
+ ) ;
2687
+ let blockfile_provider =
2688
+ BlockfileProvider :: ArrowBlockfileProvider ( arrow_blockfile_provider) ;
2689
+
2690
+ let tenant = String :: from ( "test_tenant" ) ;
2691
+ let database_id = DatabaseUuid :: new ( ) ;
2692
+
2693
+ // Original collection ID
2694
+ let original_collection_id =
2695
+ CollectionUuid :: from_str ( "00000000-0000-0000-0000-000000000001" ) . expect ( "parse error" ) ;
2696
+
2697
+ let mut metadata_segment = chroma_types:: Segment {
2698
+ id : SegmentUuid :: from_str ( "00000000-0000-0000-0000-000000000002" ) . expect ( "parse error" ) ,
2699
+ r#type : chroma_types:: SegmentType :: BlockfileMetadata ,
2700
+ scope : chroma_types:: SegmentScope :: METADATA ,
2701
+ collection : original_collection_id,
2702
+ metadata : None ,
2703
+ file_path : HashMap :: new ( ) ,
2704
+ } ;
2705
+
2706
+ // First flush: create initial blockfiles
2707
+ {
2708
+ let metadata_writer = MetadataSegmentWriter :: from_segment (
2709
+ & tenant,
2710
+ & database_id,
2711
+ & metadata_segment,
2712
+ & blockfile_provider,
2713
+ )
2714
+ . await
2715
+ . expect ( "Error creating metadata writer" ) ;
2716
+
2717
+ let metadata_flusher = metadata_writer
2718
+ . commit ( )
2719
+ . await
2720
+ . expect ( "Error committing metadata" ) ;
2721
+
2722
+ metadata_segment. file_path = metadata_flusher
2723
+ . flush ( )
2724
+ . await
2725
+ . expect ( "Error flushing metadata" ) ;
2726
+ }
2727
+
2728
+ // Verify sparse index files were created
2729
+ assert ! ( metadata_segment. file_path. contains_key( SPARSE_MAX ) ) ;
2730
+ assert ! ( metadata_segment. file_path. contains_key( SPARSE_OFFSET_VALUE ) ) ;
2731
+
2732
+ // Extract the original prefix
2733
+ let original_prefix = {
2734
+ let existing_file_path = metadata_segment
2735
+ . file_path
2736
+ . values ( )
2737
+ . next ( )
2738
+ . and_then ( |paths| paths. first ( ) )
2739
+ . expect ( "Should have at least one blockfile" ) ;
2740
+
2741
+ let ( prefix, _) = chroma_types:: Segment :: extract_prefix_and_id ( existing_file_path)
2742
+ . expect ( "Should be able to extract prefix" ) ;
2743
+ prefix. to_string ( )
2744
+ } ;
2745
+
2746
+ // Simulate missing sparse index files (e.g., from older version or deleted)
2747
+ metadata_segment. file_path . remove ( SPARSE_MAX ) ;
2748
+ metadata_segment. file_path . remove ( SPARSE_OFFSET_VALUE ) ;
2749
+
2750
+ // Change collection ID to simulate a forked collection
2751
+ let forked_collection_id =
2752
+ CollectionUuid :: from_str ( "00000000-0000-0000-0000-000000000003" ) . expect ( "parse error" ) ;
2753
+ metadata_segment. collection = forked_collection_id;
2754
+
2755
+ // Second flush: recreate sparse index files
2756
+ // The bug fix ensures they use the existing prefix, not a new one
2757
+ {
2758
+ let metadata_writer = MetadataSegmentWriter :: from_segment (
2759
+ & tenant,
2760
+ & database_id,
2761
+ & metadata_segment,
2762
+ & blockfile_provider,
2763
+ )
2764
+ . await
2765
+ . expect ( "Error creating metadata writer" ) ;
2766
+
2767
+ let metadata_flusher = metadata_writer
2768
+ . commit ( )
2769
+ . await
2770
+ . expect ( "Error committing metadata" ) ;
2771
+
2772
+ metadata_segment. file_path = metadata_flusher
2773
+ . flush ( )
2774
+ . await
2775
+ . expect ( "Error flushing metadata" ) ;
2776
+ }
2777
+
2778
+ // Verify sparse index files were recreated
2779
+ assert ! (
2780
+ metadata_segment. file_path. contains_key( SPARSE_MAX ) ,
2781
+ "Sparse max should be recreated"
2782
+ ) ;
2783
+ assert ! (
2784
+ metadata_segment. file_path. contains_key( SPARSE_OFFSET_VALUE ) ,
2785
+ "Sparse offset value should be recreated"
2786
+ ) ;
2787
+
2788
+ // Verify ALL blockfiles use the original prefix
2789
+ for ( key, paths) in & metadata_segment. file_path {
2790
+ for path in paths {
2791
+ let ( prefix, _) = chroma_types:: Segment :: extract_prefix_and_id ( path)
2792
+ . expect ( "Should be able to extract prefix" ) ;
2793
+ assert_eq ! (
2794
+ prefix, original_prefix,
2795
+ "All blockfiles should use original prefix. Key: {}, Path: {}" ,
2796
+ key, path
2797
+ ) ;
2798
+ // Verify the prefix contains the original collection ID, not the forked one
2799
+ assert ! (
2800
+ prefix. contains( & original_collection_id. to_string( ) ) ,
2801
+ "Prefix should contain original collection ID"
2802
+ ) ;
2803
+ assert ! (
2804
+ !prefix. contains( & forked_collection_id. to_string( ) ) ,
2805
+ "Prefix should NOT contain forked collection ID"
2806
+ ) ;
2807
+ }
2808
+ }
2809
+
2810
+ // Verify we can read from the segment with recreated sparse indices
2811
+ {
2812
+ let metadata_reader =
2813
+ MetadataSegmentReader :: from_segment ( & metadata_segment, & blockfile_provider)
2814
+ . await
2815
+ . expect ( "Should be able to read from segment with recreated sparse indices" ) ;
2816
+
2817
+ assert ! (
2818
+ metadata_reader. sparse_index_reader. is_some( ) ,
2819
+ "Sparse index reader should be created, verifying files exist and are readable"
2820
+ ) ;
2821
+ }
2822
+ // Simulate legacy files without prefix
2823
+ metadata_segment. file_path . drain ( ) ;
2824
+ metadata_segment. file_path . insert (
2825
+ "legacy_file" . to_string ( ) ,
2826
+ vec ! [ "11111111-1111-1111-1111-111111111111" . to_string( ) ] ,
2827
+ ) ;
2828
+
2829
+ // Change collection ID to simulate a forked collection
2830
+ let forked_collection_id =
2831
+ CollectionUuid :: from_str ( "00000000-0000-0000-0000-000000000004" ) . expect ( "parse error" ) ;
2832
+ metadata_segment. collection = forked_collection_id;
2833
+
2834
+ // Third flush: recreate all index files
2835
+ // The bug fix ensures they use the existing prefix, not a new one
2836
+ {
2837
+ let metadata_writer = MetadataSegmentWriter :: from_segment (
2838
+ & tenant,
2839
+ & database_id,
2840
+ & metadata_segment,
2841
+ & blockfile_provider,
2842
+ )
2843
+ . await
2844
+ . expect ( "Error creating metadata writer" ) ;
2845
+
2846
+ let metadata_flusher = metadata_writer
2847
+ . commit ( )
2848
+ . await
2849
+ . expect ( "Error committing metadata" ) ;
2850
+
2851
+ metadata_segment. file_path = metadata_flusher
2852
+ . flush ( )
2853
+ . await
2854
+ . expect ( "Error flushing metadata" ) ;
2855
+ }
2856
+
2857
+ // Verify sparse index files were recreated
2858
+ assert ! (
2859
+ metadata_segment. file_path. contains_key( SPARSE_MAX ) ,
2860
+ "Sparse max should be recreated"
2861
+ ) ;
2862
+ assert ! (
2863
+ metadata_segment. file_path. contains_key( SPARSE_OFFSET_VALUE ) ,
2864
+ "Sparse offset value should be recreated"
2865
+ ) ;
2866
+
2867
+ // Verify ALL blockfiles use the original prefix
2868
+ for ( key, paths) in & metadata_segment. file_path {
2869
+ for path in paths {
2870
+ let ( prefix, _) = chroma_types:: Segment :: extract_prefix_and_id ( path)
2871
+ . expect ( "Should be able to extract prefix" ) ;
2872
+ assert ! (
2873
+ prefix. is_empty( ) ,
2874
+ "All blockfiles should use empty prefix. Key: {}, Path: {}" ,
2875
+ key,
2876
+ path
2877
+ ) ;
2878
+ }
2879
+ }
2880
+
2881
+ // Verify we can read from the segment with recreated sparse indices
2882
+ {
2883
+ let metadata_reader =
2884
+ MetadataSegmentReader :: from_segment ( & metadata_segment, & blockfile_provider)
2885
+ . await
2886
+ . expect ( "Should be able to read from segment with recreated sparse indices" ) ;
2887
+
2888
+ assert ! (
2889
+ metadata_reader. sparse_index_reader. is_some( ) ,
2890
+ "Sparse index reader should be created, verifying files exist and are readable"
2891
+ ) ;
2892
+ }
2893
+ }
2656
2894
}
0 commit comments