@@ -1000,134 +1000,179 @@ impl CanSplitDoBetter {
10001000 }
10011001 }
10021002
1003- /// Optimize the order in which splits will get processed based on how it can skip the most
1004- /// splits.
1005- ///
1006- /// The leaf search code contains some logic that makes it possible to skip entire splits
1007- /// when we are confident they won't make it into top K.
1008- /// To make this optimization as potent as possible, we sort the splits so that the first splits
1009- /// are the most likely to fill our Top K.
1010- /// In the future, as split get more metadata per column, we may be able to do this more than
1011- /// just for timestamp and "unsorted" request.
1012- fn optimize_split_order ( & self , splits : & mut [ SplitIdAndFooterOffsets ] ) {
1013- match self {
1014- CanSplitDoBetter :: SplitIdHigher ( _) => {
1015- splits. sort_unstable_by ( |a, b| b. split_id . cmp ( & a. split_id ) )
1016- }
1017- CanSplitDoBetter :: SplitTimestampHigher ( _)
1018- | CanSplitDoBetter :: FindTraceIdsAggregation ( _) => {
1019- splits. sort_unstable_by_key ( |split| std:: cmp:: Reverse ( split. timestamp_end ( ) ) )
1020- }
1021- CanSplitDoBetter :: SplitTimestampLower ( _) => {
1022- splits. sort_unstable_by_key ( |split| split. timestamp_start ( ) )
1023- }
1024- CanSplitDoBetter :: Uninformative => ( ) ,
1003+ fn to_splits_and_request (
1004+ splits : Vec < SplitIdAndFooterOffsets > ,
1005+ request : Arc < SearchRequest > ,
1006+ ) -> Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > {
1007+ splits
1008+ . into_iter ( )
1009+ . map ( |split| ( split, ( * request) . clone ( ) ) )
1010+ . collect :: < Vec < _ > > ( )
1011+ }
1012+
1013+ /// Calculate the number of splits which are guaranteed to deliver enough documents.
1014+ fn get_min_required_splits (
1015+ splits : & [ SplitIdAndFooterOffsets ] ,
1016+ request : & SearchRequest ,
1017+ ) -> usize {
1018+ let num_requested_docs = request. start_offset + request. max_hits ;
1019+
1020+ splits
1021+ . into_iter ( )
1022+ . map ( |split| split. num_docs )
1023+ // computing the partial sum
1024+ . scan ( 0u64 , |partial_sum : & mut u64 , num_docs_in_split : u64 | {
1025+ * partial_sum += num_docs_in_split;
1026+ Some ( * partial_sum)
1027+ } )
1028+ . take_while ( |partial_sum| * partial_sum < num_requested_docs)
1029+ . count ( )
1030+ + 1
1031+ }
1032+
1033+ fn optimize_split_id_higher (
1034+ & self ,
1035+ request : Arc < SearchRequest > ,
1036+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1037+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1038+ splits. sort_unstable_by ( |a, b| b. split_id . cmp ( & a. split_id ) ) ;
1039+
1040+ if !is_simple_all_query ( & request) {
1041+ // no optimization opportunity here.
1042+ return Ok ( Self :: to_splits_and_request ( splits, request) ) ;
10251043 }
1044+
1045+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1046+ let mut split_with_req = Self :: to_splits_and_request ( splits, request) ;
1047+
1048+ // In this case there is no sort order, we order by split id.
1049+ // If the the first split has enough documents, we can convert the other queries to
1050+ // count only queries.
1051+ for ( _split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1052+ disable_search_request_hits ( request) ;
1053+ }
1054+
1055+ Ok ( split_with_req)
10261056 }
10271057
1028- /// This function tries to detect upfront which splits contain the top n hits and convert other
1029- /// split searches to count only searches. It also optimizes split order.
1030- ///
1031- /// Returns the search_requests with their split.
1032- fn optimize (
1058+ fn optimize_split_timestamp_higher (
10331059 & self ,
10341060 request : Arc < SearchRequest > ,
10351061 mut splits : Vec < SplitIdAndFooterOffsets > ,
10361062 ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1037- self . optimize_split_order ( & mut splits ) ;
1063+ splits . sort_unstable_by_key ( |split| std :: cmp :: Reverse ( split . timestamp_end ( ) ) ) ;
10381064
10391065 if !is_simple_all_query ( & request) {
10401066 // no optimization opportunity here.
1041- return Ok ( splits
1042- . into_iter ( )
1043- . map ( |split| ( split, ( * request) . clone ( ) ) )
1044- . collect :: < Vec < _ > > ( ) ) ;
1067+ return Ok ( Self :: to_splits_and_request ( splits, request) ) ;
10451068 }
10461069
1047- let num_requested_docs = request. start_offset + request. max_hits ;
1070+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1071+ let mut split_with_req = Self :: to_splits_and_request ( splits, request) ;
10481072
1049- // Calculate the number of splits which are guaranteed to deliver enough documents.
1050- let min_required_splits = splits
1073+ // We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
1074+ //
1075+ // We have the number of splits we need to search to get enough docs, now we need to
1076+ // find the splits that don't overlap.
1077+ //
1078+ // Let's get the smallest timestamp_start of the first num_splits splits
1079+ let smallest_start_timestamp = split_with_req
10511080 . iter ( )
1052- . map ( |split| split. num_docs )
1053- // computing the partial sum
1054- . scan ( 0u64 , |partial_sum : & mut u64 , num_docs_in_split : u64 | {
1055- * partial_sum += num_docs_in_split;
1056- Some ( * partial_sum)
1057- } )
1058- . take_while ( |partial_sum| * partial_sum < num_requested_docs)
1059- . count ( )
1060- + 1 ;
1081+ . take ( min_required_splits)
1082+ . map ( |( split, _) | split. timestamp_start ( ) )
1083+ . min ( )
1084+ // if min_required_splits is 0, we choose a value that disables all splits
1085+ . unwrap_or ( i64:: MAX ) ;
1086+ for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1087+ if split. timestamp_end ( ) < smallest_start_timestamp {
1088+ disable_search_request_hits ( request) ;
1089+ }
1090+ }
10611091
1062- // TODO: we maybe want here some deduplication + Cow logic
1063- let mut split_with_req = splits
1064- . into_iter ( )
1065- . map ( |split| ( split, ( * request) . clone ( ) ) )
1066- . collect :: < Vec < _ > > ( ) ;
1092+ Ok ( split_with_req)
1093+ }
1094+
1095+ fn optimize_split_timestamp_lower (
1096+ & self ,
1097+ request : Arc < SearchRequest > ,
1098+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1099+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1100+ splits. sort_unstable_by_key ( |split| split. timestamp_start ( ) ) ;
1101+
1102+ if !is_simple_all_query ( & request) {
1103+ // no optimization opportunity here.
1104+ return Ok ( Self :: to_splits_and_request ( splits, request) ) ;
1105+ }
1106+
1107+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1108+ let mut split_with_req = Self :: to_splits_and_request ( splits, request) ;
1109+
1110+ // We order by timestamp asc. split_with_req is sorted by timestamp_start.
1111+ //
1112+ // If we know that some splits will deliver enough documents, we can convert the
1113+ // others to count only queries.
1114+ // Since we only have start and end ranges and don't know the distribution we make
1115+ // sure the splits dont' overlap, since the distribution of two
1116+ // splits could be like this (dot is a timestamp doc on a x axis), for top 2
1117+ // queries.
1118+ // ```
1119+ // [. .] Split1 has enough docs, but last doc is not in top 2
1120+ // [.. .] Split2 first doc is in top2
1121+ // ```
1122+ // Let's get the biggest timestamp_end of the first num_splits splits
1123+ let biggest_end_timestamp = split_with_req
1124+ . iter ( )
1125+ . take ( min_required_splits)
1126+ . map ( |( split, _) | split. timestamp_end ( ) )
1127+ . max ( )
1128+ // if min_required_splits is 0, we choose a value that disables all splits
1129+ . unwrap_or ( i64:: MIN ) ;
1130+ for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1131+ if split. timestamp_start ( ) > biggest_end_timestamp {
1132+ disable_search_request_hits ( request) ;
1133+ }
1134+ }
10671135
1068- // reuse the detected sort order in split_filter
1069- // we want to detect cases where we can convert some split queries to count only queries
1136+ Ok ( split_with_req)
1137+ }
1138+
1139+ fn optimize_find_trace_ids_aggregation (
1140+ & self ,
1141+ request : Arc < SearchRequest > ,
1142+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1143+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1144+ splits. sort_unstable_by_key ( |split| std:: cmp:: Reverse ( split. timestamp_end ( ) ) ) ;
1145+
1146+ if !is_simple_all_query ( & request) {
1147+ // no optimization opportunity here.
1148+ return Ok ( Self :: to_splits_and_request ( splits, request) ) ;
1149+ }
1150+
1151+ Ok ( Self :: to_splits_and_request ( splits, request) )
1152+ }
1153+
1154+ /// This function tries to detect upfront which splits contain the top n hits and convert other
1155+ /// split searches to count only searches. It also optimizes split order.
1156+ ///
1157+ /// Returns the search_requests with their split.
1158+ fn optimize (
1159+ & self ,
1160+ request : Arc < SearchRequest > ,
1161+ splits : Vec < SplitIdAndFooterOffsets > ,
1162+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
10701163 match self {
1071- CanSplitDoBetter :: SplitIdHigher ( _) => {
1072- // In this case there is no sort order, we order by split id.
1073- // If the the first split has enough documents, we can convert the other queries to
1074- // count only queries
1075- for ( _split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1076- disable_search_request_hits ( request) ;
1077- }
1164+ CanSplitDoBetter :: SplitIdHigher ( _) => self . optimize_split_id_higher ( request, splits) ,
1165+ CanSplitDoBetter :: SplitTimestampHigher ( _) => {
1166+ self . optimize_split_timestamp_higher ( request, splits)
10781167 }
1079- CanSplitDoBetter :: Uninformative => { }
10801168 CanSplitDoBetter :: SplitTimestampLower ( _) => {
1081- // We order by timestamp asc. split_with_req is sorted by timestamp_start.
1082- //
1083- // If we know that some splits will deliver enough documents, we can convert the
1084- // others to count only queries.
1085- // Since we only have start and end ranges and don't know the distribution we make
1086- // sure the splits dont' overlap, since the distribution of two
1087- // splits could be like this (dot is a timestamp doc on a x axis), for top 2
1088- // queries.
1089- // ```
1090- // [. .] Split1 has enough docs, but last doc is not in top 2
1091- // [.. .] Split2 first doc is in top2
1092- // ```
1093- // Let's get the biggest timestamp_end of the first num_splits splits
1094- let biggest_end_timestamp = split_with_req
1095- . iter ( )
1096- . take ( min_required_splits)
1097- . map ( |( split, _) | split. timestamp_end ( ) )
1098- . max ( )
1099- // if min_required_splits is 0, we choose a value that disables all splits
1100- . unwrap_or ( i64:: MIN ) ;
1101- for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1102- if split. timestamp_start ( ) > biggest_end_timestamp {
1103- disable_search_request_hits ( request) ;
1104- }
1105- }
1169+ self . optimize_split_timestamp_lower ( request, splits)
11061170 }
1107- CanSplitDoBetter :: SplitTimestampHigher ( _) => {
1108- // We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
1109- //
1110- // We have the number of splits we need to search to get enough docs, now we need to
1111- // find the splits that don't overlap.
1112- //
1113- // Let's get the smallest timestamp_start of the first num_splits splits
1114- let smallest_start_timestamp = split_with_req
1115- . iter ( )
1116- . take ( min_required_splits)
1117- . map ( |( split, _) | split. timestamp_start ( ) )
1118- . min ( )
1119- // if min_required_splits is 0, we choose a value that disables all splits
1120- . unwrap_or ( i64:: MAX ) ;
1121- for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1122- if split. timestamp_end ( ) < smallest_start_timestamp {
1123- disable_search_request_hits ( request) ;
1124- }
1125- }
1171+ CanSplitDoBetter :: FindTraceIdsAggregation ( _) => {
1172+ self . optimize_find_trace_ids_aggregation ( request, splits)
11261173 }
1127- CanSplitDoBetter :: FindTraceIdsAggregation ( _ ) => { }
1174+ CanSplitDoBetter :: Uninformative => Ok ( Self :: to_splits_and_request ( splits , request ) ) ,
11281175 }
1129-
1130- Ok ( split_with_req)
11311176 }
11321177
11331178 /// Returns whether the given split can possibly give documents better than the one already
0 commit comments