Skip to content

Commit 4f841e4

Browse files
committed
feat: AIO batchmode
Signed-off-by: usamoi <[email protected]>
1 parent 73db729 commit 4f841e4

File tree

7 files changed

+39
-37
lines changed

7 files changed

+39
-37
lines changed

crates/algo/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,17 +96,22 @@ pub trait RelationPrefetch: Relation {
9696
fn prefetch(&self, id: u32);
9797
}
9898

99-
#[derive(Debug, Default, Clone)]
99+
#[derive(Debug, Default, Clone, Copy)]
100+
#[non_exhaustive]
100101
pub struct Hints {
101102
pub full: bool,
103+
pub batch: bool,
102104
}
103105

104106
impl Hints {
105-
#[allow(clippy::needless_update)]
106107
#[inline]
107108
pub fn full(self, full: bool) -> Self {
108109
Self { full, ..self }
109110
}
111+
#[inline]
112+
pub fn batch(self, batch: bool) -> Self {
113+
Self { batch, ..self }
114+
}
110115
}
111116

112117
pub trait RelationReadStreamTypes: RelationReadTypes {

src/index/storage.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -659,18 +659,19 @@ impl<O: Opaque> RelationReadStream for PostgresRelation<O> {
659659
_phantom: PhantomData,
660660
}));
661661
let raw = unsafe {
662-
use pgrx::pg_sys::{
663-
ForkNumber, READ_STREAM_DEFAULT, READ_STREAM_FULL, read_stream_begin_relation,
664-
};
665-
let mut flags = READ_STREAM_DEFAULT;
662+
let mut flags = pgrx::pg_sys::READ_STREAM_DEFAULT;
666663
if hints.full {
667-
flags |= READ_STREAM_FULL;
664+
flags |= pgrx::pg_sys::READ_STREAM_FULL;
668665
}
669-
read_stream_begin_relation(
666+
#[cfg(feature = "pg18")]
667+
if hints.batch {
668+
flags |= pgrx::pg_sys::READ_STREAM_USE_BATCHING;
669+
}
670+
pgrx::pg_sys::read_stream_begin_relation(
670671
flags as i32,
671672
core::ptr::null_mut(),
672673
self.raw,
673-
ForkNumber::MAIN_FORKNUM,
674+
pgrx::pg_sys::ForkNumber::MAIN_FORKNUM,
674675
Some(callback::<I>),
675676
cache.as_ptr().cast(),
676677
0,

src/index/vchordg/algo.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ impl<'b, R> Clone for MakeStreamPrefetcher<'b, R> {
266266
fn clone(&self) -> Self {
267267
Self {
268268
index: self.index,
269-
hints: self.hints.clone(),
269+
hints: self.hints,
270270
}
271271
}
272272
}
@@ -283,7 +283,7 @@ impl<'b, R: RelationRead + RelationReadStream> PrefetcherSequenceFamily<'b, R>
283283
where
284284
S::Item: Fetch<'b>,
285285
{
286-
StreamPrefetcher::new(self.index, seq, self.hints.clone())
286+
StreamPrefetcher::new(self.index, seq, self.hints)
287287
}
288288

289289
fn is_not_plain(&self) -> bool {

src/index/vchordg/scanners/default.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ impl SearchBuilder for DefaultBuilder {
111111
let make_vertex_simple_prefetcher = MakeSimplePrefetcher { index };
112112
let make_vertex_stream_prefetcher = MakeStreamPrefetcher {
113113
index,
114-
hints: Hints::default().full(true),
114+
hints: Hints::default().full(true).batch(true),
115115
};
116116
let make_vector_plain_prefetcher = MakePlainPrefetcher { index };
117117
let make_vector_simple_prefetcher = MakeSimplePrefetcher { index };
118118
let make_vector_stream_prefetcher = MakeStreamPrefetcher {
119119
index,
120-
hints: Hints::default().full(true),
120+
hints: Hints::default().full(true).batch(true),
121121
};
122122
let iter: Box<dyn Iterator<Item = (Distance, NonZero<u64>)>> =
123123
match (opfamily.vector_kind(), opfamily.distance_kind()) {

src/index/vchordrq/algo.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ impl<'b, R> Clone for MakeH0StreamPrefetcher<'b, R> {
412412
fn clone(&self) -> Self {
413413
Self {
414414
index: self.index,
415-
hints: self.hints.clone(),
415+
hints: self.hints,
416416
}
417417
}
418418
}
@@ -429,7 +429,7 @@ impl<'b, R: RelationRead + RelationReadStream> PrefetcherSequenceFamily<'b, R>
429429
where
430430
S::Item: Fetch<'b>,
431431
{
432-
StreamPrefetcher::new(self.index, seq, self.hints.clone())
432+
StreamPrefetcher::new(self.index, seq, self.hints)
433433
}
434434

435435
fn is_not_plain(&self) -> bool {

src/index/vchordrq/scanners/default.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,14 @@ impl SearchBuilder for DefaultBuilder {
110110
let Some(vector) = vector else {
111111
return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = (f32, [u16; 3], bool)>>;
112112
};
113+
let search_hints = Hints::default().full(true).batch(false);
114+
let rerank_hints = Hints::default().full(false).batch(true);
113115
let make_h1_plain_prefetcher = MakeH1PlainPrefetcher { index };
114116
let make_h0_plain_prefetcher = MakeH0PlainPrefetcher { index };
115117
let make_h0_simple_prefetcher = MakeH0SimplePrefetcher { index };
116118
let make_h0_stream_prefetcher = MakeH0StreamPrefetcher {
117119
index,
118-
hints: Hints::default().full(true),
120+
hints: search_hints,
119121
};
120122
let f = move |(distance, payload)| (opfamily.output(distance), payload);
121123
let iter: Box<dyn Iterator<Item = (f32, NonZero<u64>)>> =
@@ -195,8 +197,7 @@ impl SearchBuilder for DefaultBuilder {
195197
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
196198
}
197199
(RerankMethod::Index, Io::Stream, false) => {
198-
let prefetcher =
199-
StreamPrefetcher::new(index, sequence, Hints::default());
200+
let prefetcher = StreamPrefetcher::new(index, sequence, rerank_hints);
200201
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
201202
}
202203
(RerankMethod::Index, Io::Stream, true) => {
@@ -209,8 +210,7 @@ impl SearchBuilder for DefaultBuilder {
209210
tuple.filter()
210211
});
211212
let sequence = filter(sequence, predicate);
212-
let prefetcher =
213-
StreamPrefetcher::new(index, sequence, Hints::default());
213+
let prefetcher = StreamPrefetcher::new(index, sequence, rerank_hints);
214214
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
215215
}
216216
(RerankMethod::Heap, _, false) => {
@@ -335,8 +335,7 @@ impl SearchBuilder for DefaultBuilder {
335335
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
336336
}
337337
(RerankMethod::Index, Io::Stream, false) => {
338-
let prefetcher =
339-
StreamPrefetcher::new(index, sequence, Hints::default());
338+
let prefetcher = StreamPrefetcher::new(index, sequence, rerank_hints);
340339
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
341340
}
342341
(RerankMethod::Index, Io::Stream, true) => {
@@ -349,8 +348,7 @@ impl SearchBuilder for DefaultBuilder {
349348
tuple.filter()
350349
});
351350
let sequence = filter(sequence, predicate);
352-
let prefetcher =
353-
StreamPrefetcher::new(index, sequence, Hints::default());
351+
let prefetcher = StreamPrefetcher::new(index, sequence, rerank_hints);
354352
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
355353
}
356354
(RerankMethod::Heap, _, false) => {
@@ -475,8 +473,7 @@ impl SearchBuilder for DefaultBuilder {
475473
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
476474
}
477475
(RerankMethod::Index, Io::Stream, false) => {
478-
let prefetcher =
479-
StreamPrefetcher::new(index, sequence, Hints::default());
476+
let prefetcher = StreamPrefetcher::new(index, sequence, rerank_hints);
480477
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
481478
}
482479
(RerankMethod::Index, Io::Stream, true) => {
@@ -489,8 +486,7 @@ impl SearchBuilder for DefaultBuilder {
489486
tuple.filter()
490487
});
491488
let sequence = filter(sequence, predicate);
492-
let prefetcher =
493-
StreamPrefetcher::new(index, sequence, Hints::default());
489+
let prefetcher = StreamPrefetcher::new(index, sequence, rerank_hints);
494490
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
495491
}
496492
(RerankMethod::Heap, _, false) => {
@@ -615,8 +611,7 @@ impl SearchBuilder for DefaultBuilder {
615611
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
616612
}
617613
(RerankMethod::Index, Io::Stream, false) => {
618-
let prefetcher =
619-
StreamPrefetcher::new(index, sequence, Hints::default());
614+
let prefetcher = StreamPrefetcher::new(index, sequence, rerank_hints);
620615
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
621616
}
622617
(RerankMethod::Index, Io::Stream, true) => {
@@ -629,8 +624,7 @@ impl SearchBuilder for DefaultBuilder {
629624
tuple.filter()
630625
});
631626
let sequence = filter(sequence, predicate);
632-
let prefetcher =
633-
StreamPrefetcher::new(index, sequence, Hints::default());
627+
let prefetcher = StreamPrefetcher::new(index, sequence, rerank_hints);
634628
Box::new(rerank_index::<Op, _, _, _>(unprojected, prefetcher).map(f))
635629
}
636630
(RerankMethod::Heap, _, false) => {

src/index/vchordrq/scanners/maxsim.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,14 @@ impl SearchBuilder for MaxsimBuilder {
101101
pgrx::error!("maxsim search with rerank_in_table is not supported");
102102
}
103103
assert!(matches!(opfamily.distance_kind(), DistanceKind::Dot));
104+
let search_hints = Hints::default().full(true).batch(false);
105+
let rerank_hints = Hints::default().full(false).batch(true);
104106
let make_h1_plain_prefetcher = MakeH1PlainPrefetcher { index };
105107
let make_h0_plain_prefetcher = MakeH0PlainPrefetcher { index };
106108
let make_h0_simple_prefetcher = MakeH0SimplePrefetcher { index };
107109
let make_h0_stream_prefetcher = MakeH0StreamPrefetcher {
108110
index,
109-
hints: Hints::default().full(true),
111+
hints: search_hints,
110112
};
111113
let n = vectors.len();
112114
let accu_map = |(Reverse(distance), AlwaysEqual(payload))| (distance, payload);
@@ -225,7 +227,7 @@ impl SearchBuilder for MaxsimBuilder {
225227
}
226228
(Io::Stream, false) => {
227229
let prefetcher =
228-
StreamPrefetcher::new(index, sequence, Hints::default());
230+
StreamPrefetcher::new(index, sequence, rerank_hints);
229231
let mut reranker =
230232
rerank_index::<Op, _, _, _>(unprojected[i].clone(), prefetcher);
231233
accu_set.extend(reranker.by_ref().take(maxsim_refine as _));
@@ -244,7 +246,7 @@ impl SearchBuilder for MaxsimBuilder {
244246
});
245247
let sequence = filter(sequence, predicate);
246248
let prefetcher =
247-
StreamPrefetcher::new(index, sequence, Hints::default());
249+
StreamPrefetcher::new(index, sequence, rerank_hints);
248250
let mut reranker =
249251
rerank_index::<Op, _, _, _>(unprojected[i].clone(), prefetcher);
250252
accu_set.extend(reranker.by_ref().take(maxsim_refine as _));
@@ -369,7 +371,7 @@ impl SearchBuilder for MaxsimBuilder {
369371
}
370372
(Io::Stream, false) => {
371373
let prefetcher =
372-
StreamPrefetcher::new(index, sequence, Hints::default());
374+
StreamPrefetcher::new(index, sequence, rerank_hints);
373375
let mut reranker =
374376
rerank_index::<Op, _, _, _>(unprojected[i].clone(), prefetcher);
375377
accu_set.extend(reranker.by_ref().take(maxsim_refine as _));
@@ -388,7 +390,7 @@ impl SearchBuilder for MaxsimBuilder {
388390
});
389391
let sequence = filter(sequence, predicate);
390392
let prefetcher =
391-
StreamPrefetcher::new(index, sequence, Hints::default());
393+
StreamPrefetcher::new(index, sequence, rerank_hints);
392394
let mut reranker =
393395
rerank_index::<Op, _, _, _>(unprojected[i].clone(), prefetcher);
394396
accu_set.extend(reranker.by_ref().take(maxsim_refine as _));

0 commit comments

Comments
 (0)