Skip to content

Commit 7021b17

Browse files
committed
Add wait_idle api call.
1 parent c6fc5ad commit 7021b17

File tree

5 files changed

+41
-3
lines changed

5 files changed

+41
-3
lines changed

src/api.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod downloader;
3030
pub mod proto;
3131
pub mod remote;
3232
pub mod tags;
33+
use crate::api::proto::WaitIdleRequest;
3334
pub use crate::{store::util::Tag, util::temp_tag::TempTag};
3435

3536
pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
@@ -314,6 +315,7 @@ impl Store {
314315
Request::ClearProtected(msg) => local.send((msg, tx)),
315316
Request::SyncDb(msg) => local.send((msg, tx)),
316317
Request::Shutdown(msg) => local.send((msg, tx)),
318+
Request::WaitIdle(msg) => local.send((msg, tx)),
317319
}
318320
})
319321
});
@@ -332,6 +334,12 @@ impl Store {
332334
Ok(())
333335
}
334336

337+
pub async fn wait_idle(&self) -> irpc::Result<()> {
338+
let msg = WaitIdleRequest;
339+
self.client.rpc(msg).await?;
340+
Ok(())
341+
}
342+
335343
pub(crate) fn from_sender(client: ApiClient) -> Self {
336344
Self { client }
337345
}

src/api/proto.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,16 @@ pub enum Request {
134134
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
135135
SyncDb(SyncDbRequest),
136136
#[rpc(tx = oneshot::Sender<()>)]
137+
WaitIdle(WaitIdleRequest),
138+
#[rpc(tx = oneshot::Sender<()>)]
137139
Shutdown(ShutdownRequest),
138140
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
139141
ClearProtected(ClearProtectedRequest),
140142
}
141143

144+
#[derive(Debug, Serialize, Deserialize)]
145+
pub struct WaitIdleRequest;
146+
142147
#[derive(Debug, Serialize, Deserialize)]
143148
pub struct SyncDbRequest;
144149

src/store/fs.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ struct Actor {
243243
handles: EntityManagerState<EmParams>,
244244
// temp tags
245245
temp_tags: TempTags,
246+
// waiters for idle state.
247+
idle_waiters: Vec<irpc::channel::oneshot::Sender<()>>,
246248
// our private tokio runtime. It has to live somewhere.
247249
_rt: RtWrapper,
248250
}
@@ -456,6 +458,16 @@ impl Actor {
456458
trace!("{cmd:?}");
457459
self.db().send(cmd.into()).await.ok();
458460
}
461+
Command::WaitIdle(cmd) => {
462+
trace!("{cmd:?}");
463+
if self.tasks.is_empty() {
464+
// we are currently idle
465+
cmd.tx.send(()).await.ok();
466+
} else {
467+
// wait for idle state
468+
self.idle_waiters.push(cmd.tx);
469+
}
470+
}
459471
Command::Shutdown(cmd) => {
460472
trace!("{cmd:?}");
461473
self.db().send(cmd.into()).await.ok();
@@ -599,6 +611,11 @@ impl Actor {
599611
}
600612
Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => {
601613
Self::log_task_result(res);
614+
if self.tasks.is_empty() {
615+
for tx in self.idle_waiters.drain(..) {
616+
let _ = tx.send(());
617+
}
618+
}
602619
}
603620
}
604621
}
@@ -648,6 +665,7 @@ impl Actor {
648665
tasks: JoinSet::new(),
649666
handles: EntityManagerState::new(slot_context, 1024, 32, 32, 2),
650667
temp_tags: Default::default(),
668+
idle_waiters: Vec::new(),
651669
_rt: rt,
652670
})
653671
}
@@ -818,7 +836,6 @@ async fn handle_batch(cmd: BatchMsg, id: Scope, scope: Arc<TempTagScope>, ctx: A
818836
if let Err(cause) = handle_batch_impl(cmd, id, &scope).await {
819837
error!("batch failed: {cause}");
820838
}
821-
println!("batch done, clearing scope {}", id);
822839
ctx.clear_scope(id).await;
823840
}
824841

@@ -1969,6 +1986,7 @@ pub mod tests {
19691986
println!("dropping batch");
19701987
drop(batch);
19711988
store.sync_db().await?;
1989+
store.wait_idle().await?;
19721990
println!("reading temp tags after batch drop");
19731991
let tts = store
19741992
.tags()

src/store/mem.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use crate::{
5151
ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest,
5252
ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest,
5353
ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg,
54-
SetTagRequest, ShutdownMsg, SyncDbMsg,
54+
SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg,
5555
},
5656
tags::TagInfo,
5757
ApiClient,
@@ -162,6 +162,10 @@ impl Actor {
162162
let entry = self.get_or_create_entry(hash);
163163
self.spawn(import_bao(entry, size, data, tx));
164164
}
165+
Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
166+
trace!("wait idle");
167+
tx.send(()).await.ok();
168+
}
165169
Command::Observe(ObserveMsg {
166170
inner: ObserveRequest { hash },
167171
tx,

src/store/readonly_mem.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::{
3737
self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
3838
ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
3939
ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
40-
ObserveRequest,
40+
ObserveRequest, WaitIdleMsg,
4141
},
4242
ApiClient, TempTag,
4343
},
@@ -86,6 +86,9 @@ impl Actor {
8686
.await
8787
.ok();
8888
}
89+
Command::WaitIdle(WaitIdleMsg { tx, .. }) => {
90+
tx.send(()).await.ok();
91+
}
8992
Command::ImportBytes(ImportBytesMsg { tx, .. }) => {
9093
tx.send(io::Error::other("import not supported").into())
9194
.await

0 commit comments

Comments
 (0)