Skip to content

Commit 3b72fca

Browse files
committed
Add wait_idle api call.
1 parent c6fc5ad commit 3b72fca

File tree

6 files changed

+78
-3
lines changed

6 files changed

+78
-3
lines changed

src/api.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod downloader;
3030
pub mod proto;
3131
pub mod remote;
3232
pub mod tags;
33+
use crate::api::proto::WaitIdleRequest;
3334
pub use crate::{store::util::Tag, util::temp_tag::TempTag};
3435

3536
pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
@@ -314,6 +315,7 @@ impl Store {
314315
Request::ClearProtected(msg) => local.send((msg, tx)),
315316
Request::SyncDb(msg) => local.send((msg, tx)),
316317
Request::Shutdown(msg) => local.send((msg, tx)),
318+
Request::WaitIdle(msg) => local.send((msg, tx)),
317319
}
318320
})
319321
});
@@ -332,6 +334,19 @@ impl Store {
332334
Ok(())
333335
}
334336

337+
/// Waits for the store to become completely idle.
338+
///
339+
/// This is mostly useful for tests, where you want to check that e.g. the
340+
/// store has written all data to disk.
341+
///
342+
/// Note that a store is not guaranteed to become idle, if it is being
343+
/// interacted with concurrently. So this might wait forever.
344+
pub async fn wait_idle(&self) -> irpc::Result<()> {
345+
let msg = WaitIdleRequest;
346+
self.client.rpc(msg).await?;
347+
Ok(())
348+
}
349+
335350
pub(crate) fn from_sender(client: ApiClient) -> Self {
336351
Self { client }
337352
}

src/api/proto.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,16 @@ pub enum Request {
134134
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
135135
SyncDb(SyncDbRequest),
136136
#[rpc(tx = oneshot::Sender<()>)]
137+
WaitIdle(WaitIdleRequest),
138+
#[rpc(tx = oneshot::Sender<()>)]
137139
Shutdown(ShutdownRequest),
138140
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
139141
ClearProtected(ClearProtectedRequest),
140142
}
141143

144+
#[derive(Debug, Serialize, Deserialize)]
145+
pub struct WaitIdleRequest;
146+
142147
#[derive(Debug, Serialize, Deserialize)]
143148
pub struct SyncDbRequest;
144149

src/store/fs.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ struct Actor {
243243
handles: EntityManagerState<EmParams>,
244244
// temp tags
245245
temp_tags: TempTags,
246+
// waiters for idle state.
247+
idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
246248
// our private tokio runtime. It has to live somewhere.
247249
_rt: RtWrapper,
248250
}
@@ -456,6 +458,16 @@ impl Actor {
456458
trace!("{cmd:?}");
457459
self.db().send(cmd.into()).await.ok();
458460
}
461+
Command::WaitIdle(cmd) => {
462+
trace!("{cmd:?}");
463+
if self.tasks.is_empty() {
464+
// we are currently idle
465+
cmd.tx.send(()).await.ok();
466+
} else {
467+
// wait for idle state
468+
self.idle_waiters.push(cmd.tx);
469+
}
470+
}
459471
Command::Shutdown(cmd) => {
460472
trace!("{cmd:?}");
461473
self.db().send(cmd.into()).await.ok();
@@ -599,6 +611,11 @@ impl Actor {
599611
}
600612
Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
601613
Self::log_task_result(res);
614+
if self.tasks.is_empty() {
615+
for tx in self.idle_waiters.drain(..) {
616+
tx.send(()).await.ok();
617+
}
618+
}
602619
}
603620
}
604621
}
@@ -648,6 +665,7 @@ impl Actor {
648665
tasks: JoinSet::new(),
649666
handles: EntityManagerState::new(slot_context, 1024, 32, 32, 2),
650667
temp_tags: Default::default(),
668+
idle_waiters: Vec::new(),
651669
_rt: rt,
652670
})
653671
}
@@ -818,7 +836,6 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: A
818836
if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
819837
error!("batch failed: {cause}");
820838
}
821-
println!("batch done, clearing scope {}", id);
822839
ctx.clear_scope(id).await;
823840
}
824841

@@ -1969,6 +1986,7 @@ pub mod tests {
19691986
println!("dropping batch");
19701987
drop(batch);
19711988
store.sync_db().await?;
1989+
store.wait_idle().await?;
19721990
println!("reading temp tags after batch drop");
19731991
let tts = store
19741992
.tags()

src/store/mem.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use crate::{
5151
ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
5252
ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
5353
ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54-
SetTagRequest, ShutdownMsg, SyncDbMsg,
54+
SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
5555
},
5656
tags::TagInfo,
5757
ApiClient,
@@ -122,6 +122,7 @@ impl MemStore {
122122
options: Arc::new(Options::default()),
123123
temp_tags: Default::default(),
124124
protected: Default::default(),
125+
idle_waiters: Default::default(),
125126
}
126127
.run(),
127128
);
@@ -137,6 +138,8 @@ struct Actor {
137138
options: Arc<Options>,
138139
// temp tags
139140
temp_tags: TempTags,
141+
// idle waiters
142+
idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
140143
protected: HashSet<Hash>,
141144
}
142145

@@ -162,6 +165,16 @@ impl Actor {
162165
let entry = self.get_or_create_entry(hash);
163166
self.spawn(import_bao(entry, size, data, tx));
164167
}
168+
Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
169+
trace!("wait idle");
170+
if self.tasks.is_empty() {
171+
// we are currently idle
172+
tx.send(()).await.ok();
173+
} else {
174+
// wait for idle state
175+
self.idle_waiters.push(tx);
176+
}
177+
}
165178
Command::Observe(ObserveMsg {
166179
inner: ObserveRequest { hash },
167180
tx,
@@ -485,6 +498,12 @@ impl Actor {
485498
}
486499
TaskResult::Unit(_) => {}
487500
}
501+
if self.tasks.is_empty() {
502+
// we are idle now
503+
for tx in self.idle_waiters.drain(..) {
504+
tx.send(()).await.ok();
505+
}
506+
}
488507
}
489508
}
490509
};

src/store/readonly_mem.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::{
3737
self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
3838
ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
3939
ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
40-
ObserveRequest,
40+
ObserveRequest, WaitIdleMsg,
4141
},
4242
ApiClient, TempTag,
4343
},
@@ -62,6 +62,7 @@ impl Deref for ReadonlyMemStore {
6262
struct Actor {
6363
commands: tokio::sync::mpsc::Receiver<proto::Command>,
6464
tasks: JoinSet<()>,
65+
idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
6566
data: HashMap<Hash, CompleteStorage>,
6667
}
6768

@@ -74,6 +75,7 @@ impl Actor {
7475
data,
7576
commands,
7677
tasks: JoinSet::new(),
78+
idle_waiters: Vec::new(),
7779
}
7880
}
7981

@@ -86,6 +88,15 @@ impl Actor {
8688
.await
8789
.ok();
8890
}
91+
Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
92+
if self.tasks.is_empty() {
93+
// we are currently idle
94+
tx.send(()).await.ok();
95+
} else {
96+
// wait for idle state
97+
self.idle_waiters.push(tx);
98+
}
99+
}
89100
Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
90101
tx.send(io::Error::other("import not supported").into())
91102
.await
@@ -226,6 +237,12 @@ impl Actor {
226237
},
227238
Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
228239
self.log_unit_task(res);
240+
if self.tasks.is_empty() {
241+
// we are idle now
242+
for tx in self.idle_waiters.drain(..) {
243+
tx.send(()).await.ok();
244+
}
245+
}
229246
},
230247
else => break,
231248
}

src/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ pub mod sink {
472472
}
473473
}
474474

475+
#[allow(dead_code)]
475476
pub struct IrpcSenderSink<T>(pub irpc::channel::mpsc::Sender<T>);
476477

477478
impl<T> Sink<T> for IrpcSenderSink<T>

0 commit comments

Comments
 (0)