Skip to content

Commit ee3e710

Browse files
authored
refactor: Remove slot mutex and simplify per blob state (#104)
## Description Before, the state we kept per hash was a bit convoluted. The entry point was a Slot, which contained a tokio mutex to cover the async loading of an entry state from the metadata db. Then inside that, there was the actual entry state, wrapped in an option to cover the case that we don't have anything about the hash. This was working, but it was an arc in an arc in an arc, and also led to bugs in the case of trying to export a blob that doesn't exist. Now all these states are flattened into a single enum, and we can easily define what should happen when we e.g. do an export of an entry that does not exist - return an appropriate io error. ## Breaking Changes None ## Notes & open questions Note: you could remove the Initial and Loading state by using a tokio::sync::OnceCell. But that is a true sync primitive, and we want to use an AtomicRefCell, and also using a OnceLock would come with its own sync primitive (a semaphore) just for init, and then we have our own one for the non-load state changes. I don't think it is that bad... Note: a lot of changes are to add the wait_idle fn in the rpc protocol, which isn't really related to the changes in the PR but just a way to get rid of some flaky tests. <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented.
1 parent ef1d1df commit ee3e710

File tree

14 files changed

+691
-490
lines changed

14 files changed

+691
-490
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ jobs:
143143
- uses: taiki-e/install-action@cross
144144

145145
- name: test
146-
run: cross test --all --target ${{ matrix.target }} -- --test-threads=4
146+
run: cross test --all --target ${{ matrix.target }} -- --test-threads=1
147147
env:
148148
RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG' }}
149149

Cargo.lock

Lines changed: 6 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ chrono = "0.4.39"
3737
nested_enum_utils = "0.2.1"
3838
ref-cast = "1.0.24"
3939
arrayvec = "0.7.6"
40-
iroh = "0.91"
40+
iroh = "0.91.1"
4141
self_cell = "1.1.0"
4242
genawaiter = { version = "0.99.1", features = ["futures03"] }
43-
iroh-base = "0.91"
43+
iroh-base = "0.91.1"
4444
reflink-copy = "0.1.24"
4545
irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
4646
iroh-metrics = { version = "0.35" }
@@ -59,9 +59,13 @@ tracing-subscriber = { version = "0.3.19", features = ["fmt"] }
5959
tracing-test = "0.2.5"
6060
walkdir = "2.5.0"
6161
atomic_refcell = "0.1.13"
62-
iroh = { version = "0.91", features = ["discovery-local-network"]}
62+
iroh = { version = "0.91.1", features = ["discovery-local-network"]}
6363

6464
[features]
6565
hide-proto-docs = []
6666
metrics = []
6767
default = ["hide-proto-docs"]
68+
69+
[patch.crates-io]
70+
iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }
71+
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }

deny.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,8 @@ name = "ring"
3939
[[licenses.clarify.license-files]]
4040
hash = 3171872035
4141
path = "LICENSE"
42+
43+
[sources]
44+
allow-git = [
45+
"https://github.com/n0-computer/iroh",
46+
]

src/api.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod downloader;
3030
pub mod proto;
3131
pub mod remote;
3232
pub mod tags;
33+
use crate::api::proto::WaitIdleRequest;
3334
pub use crate::{store::util::Tag, util::temp_tag::TempTag};
3435

3536
pub(crate) type ApiClient = irpc::Client<proto::Request>;
@@ -298,6 +299,23 @@ impl Store {
298299
Ok(())
299300
}
300301

302+
/// Waits for the store to become completely idle.
303+
///
304+
/// This is mostly useful for tests, where you want to check that e.g. the
305+
/// store has written all data to disk.
306+
///
307+
/// Note that a store is not guaranteed to become idle, if it is being
308+
/// interacted with concurrently. So this might wait forever.
309+
///
310+
/// Also note that once you get the callback, the store is not guaranteed to
311+
/// still be idle. All this tells you that there was a point in time where
312+
/// the store was idle between the call and the response.
313+
pub async fn wait_idle(&self) -> irpc::Result<()> {
314+
let msg = WaitIdleRequest;
315+
self.client.rpc(msg).await?;
316+
Ok(())
317+
}
318+
301319
pub(crate) fn from_sender(client: ApiClient) -> Self {
302320
Self { client }
303321
}

src/api/proto.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,16 @@ pub enum Request {
130130
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
131131
SyncDb(SyncDbRequest),
132132
#[rpc(tx = oneshot::Sender<()>)]
133+
WaitIdle(WaitIdleRequest),
134+
#[rpc(tx = oneshot::Sender<()>)]
133135
Shutdown(ShutdownRequest),
134136
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
135137
ClearProtected(ClearProtectedRequest),
136138
}
137139

140+
#[derive(Debug, Serialize, Deserialize)]
141+
pub struct WaitIdleRequest;
142+
138143
#[derive(Debug, Serialize, Deserialize)]
139144
pub struct SyncDbRequest;
140145

src/api/remote.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1064,8 +1064,15 @@ mod tests {
10641064
use testresult::TestResult;
10651065

10661066
use crate::{
1067+
api::blobs::Blobs,
10671068
protocol::{ChunkRangesSeq, GetRequest},
1068-
store::fs::{tests::INTERESTING_SIZES, FsStore},
1069+
store::{
1070+
fs::{
1071+
tests::{create_n0_bao, test_data, INTERESTING_SIZES},
1072+
FsStore,
1073+
},
1074+
mem::MemStore,
1075+
},
10691076
tests::{add_test_hash_seq, add_test_hash_seq_incomplete},
10701077
util::ChunkRangesExt,
10711078
};
@@ -1117,6 +1124,38 @@ mod tests {
11171124
Ok(())
11181125
}
11191126

1127+
async fn test_observe_partial(blobs: &Blobs) -> TestResult<()> {
1128+
let sizes = INTERESTING_SIZES;
1129+
for size in sizes {
1130+
let data = test_data(size);
1131+
let ranges = ChunkRanges::chunk(0);
1132+
let (hash, bao) = create_n0_bao(&data, &ranges)?;
1133+
blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
1134+
let bitfield = blobs.observe(hash).await?;
1135+
if size > 1024 {
1136+
assert_eq!(bitfield.ranges, ranges);
1137+
} else {
1138+
assert_eq!(bitfield.ranges, ChunkRanges::all());
1139+
}
1140+
}
1141+
Ok(())
1142+
}
1143+
1144+
#[tokio::test]
1145+
async fn test_observe_partial_mem() -> TestResult<()> {
1146+
let store = MemStore::new();
1147+
test_observe_partial(store.blobs()).await?;
1148+
Ok(())
1149+
}
1150+
1151+
#[tokio::test]
1152+
async fn test_observe_partial_fs() -> TestResult<()> {
1153+
let td = tempfile::tempdir()?;
1154+
let store = FsStore::load(td.path()).await?;
1155+
test_observe_partial(store.blobs()).await?;
1156+
Ok(())
1157+
}
1158+
11201159
#[tokio::test]
11211160
async fn test_local_info_hash_seq() -> TestResult<()> {
11221161
let sizes = INTERESTING_SIZES;

0 commit comments

Comments
 (0)