Skip to content

Commit 3431efe

Browse files
committed
clippy
1 parent ca6268a commit 3431efe

File tree

5 files changed

+91
-74
lines changed

5 files changed

+91
-74
lines changed

examples/expiring-tags.rs

Lines changed: 70 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,57 @@
44
//! then encode the expiry date in the tag name after the prefix, in a format
55
//! that sorts in the same order as the expiry date.
66
//!
7-
//! Then we can just use
8-
use std::time::{Duration, SystemTime};
7+
//! The example creates a number of blobs and protects them directly or indirectly
8+
//! with expiring tags. Watch as the expired tags are deleted and the blobs
9+
//! are removed from the store.
10+
use std::{
11+
ops::Deref,
12+
time::{Duration, SystemTime},
13+
};
914

1015
use chrono::Utc;
1116
use futures_lite::StreamExt;
12-
use iroh::endpoint;
1317
use iroh_blobs::{
14-
hashseq::HashSeq, rpc::client::blobs::MemClient as BlobsClient, store::GcConfig, BlobFormat,
15-
Hash, HashAndFormat, Tag,
18+
api::{blobs::AddBytesOptions, Store, Tag},
19+
hashseq::HashSeq,
20+
store::fs::options::{BatchOptions, GcConfig, InlineOptions, Options, PathOptions},
21+
BlobFormat, Hash,
1622
};
1723
use tokio::signal::ctrl_c;
1824

1925
/// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes.
2026
///
2127
/// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`).
2228
async fn create_expiring_tag(
23-
iroh: &BlobsClient,
29+
store: &Store,
2430
hashes: &[Hash],
2531
prefix: &str,
2632
expiry: SystemTime,
2733
) -> anyhow::Result<()> {
2834
let expiry = chrono::DateTime::<chrono::Utc>::from(expiry);
2935
let expiry = expiry.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
30-
let tagname = format!("{}-{}", prefix, expiry);
31-
let batch = iroh.batch().await?;
32-
let tt = if hashes.is_empty() {
36+
let tagname = format!("{prefix}-{expiry}");
37+
if hashes.is_empty() {
3338
return Ok(());
3439
} else if hashes.len() == 1 {
3540
let hash = hashes[0];
36-
batch.temp_tag(HashAndFormat::raw(hash)).await?
41+
store.tags().set(&tagname, hash).await?;
3742
} else {
3843
let hs = hashes.iter().copied().collect::<HashSeq>();
39-
batch
40-
.add_bytes_with_opts(hs.into_inner(), BlobFormat::HashSeq)
41-
.await?
44+
store
45+
.add_bytes_with_opts(AddBytesOptions {
46+
data: hs.into(),
47+
format: BlobFormat::HashSeq,
48+
})
49+
.with_named_tag(&tagname)
50+
.await?;
4251
};
43-
batch.persist_to(tt, tagname.as_str().into()).await?;
44-
println!("Created tag {}", tagname);
52+
println!("Created tag {tagname}");
4553
Ok(())
4654
}
4755

48-
async fn delete_expired_tags(blobs: &BlobsClient, prefix: &str, bulk: bool) -> anyhow::Result<()> {
49-
let mut tags = blobs.tags().list().await?;
50-
let prefix = format!("{}-", prefix);
56+
async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow::Result<()> {
57+
let prefix = format!("{prefix}-");
5158
let now = chrono::Utc::now();
5259
let end = format!(
5360
"{}-{}",
@@ -66,6 +73,7 @@ async fn delete_expired_tags(blobs: &BlobsClient, prefix: &str, bulk: bool) -> a
6673
// find tags to delete one by one and then delete them
6774
//
6875
// this allows us to print the tags before deleting them
76+
let mut tags = blobs.tags().list().await?;
6977
let mut to_delete = Vec::new();
7078
while let Some(tag) = tags.next().await {
7179
let tag = tag?.name;
@@ -85,99 +93,93 @@ async fn delete_expired_tags(blobs: &BlobsClient, prefix: &str, bulk: bool) -> a
8593
}
8694
}
8795
for tag in to_delete {
88-
println!("Deleting expired tag {}", tag);
96+
println!("Deleting expired tag {tag}\n");
8997
blobs.tags().delete(tag).await?;
9098
}
9199
}
92100
Ok(())
93101
}
94102

95-
async fn info_task(blobs: BlobsClient) -> anyhow::Result<()> {
103+
async fn print_store_info(store: &Store) -> anyhow::Result<()> {
104+
let now = chrono::Utc::now();
105+
let mut tags = store.tags().list().await?;
106+
println!(
107+
"Current time: {}",
108+
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
109+
);
110+
println!("Tags:");
111+
while let Some(tag) = tags.next().await {
112+
let tag = tag?;
113+
println!(" {tag:?}");
114+
}
115+
let mut blobs = store.list().stream().await?;
116+
println!("Blobs:");
117+
while let Some(item) = blobs.next().await {
118+
println!(" {}", item?);
119+
}
120+
println!();
121+
Ok(())
122+
}
123+
124+
async fn info_task(store: Store) -> anyhow::Result<()> {
96125
tokio::time::sleep(Duration::from_secs(1)).await;
97126
loop {
98-
let now = chrono::Utc::now();
99-
let mut tags = blobs.tags().list().await?;
100-
println!(
101-
"Current time: {}",
102-
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
103-
);
104-
println!("Tags:");
105-
while let Some(tag) = tags.next().await {
106-
let tag = tag?;
107-
println!(" {:?}", tag);
108-
}
109-
let mut blobs = blobs.list().await?;
110-
println!("Blobs:");
111-
while let Some(info) = blobs.next().await {
112-
let info = info?;
113-
println!(" {} {} bytes", info.hash, info.size);
114-
}
115-
println!();
127+
print_store_info(&store).await?;
116128
tokio::time::sleep(Duration::from_secs(5)).await;
117129
}
118130
}
119131

120-
async fn delete_expired_tags_task(blobs: BlobsClient, prefix: &str) -> anyhow::Result<()> {
132+
async fn delete_expired_tags_task(store: Store, prefix: &str) -> anyhow::Result<()> {
121133
loop {
122-
delete_expired_tags(&blobs, prefix, false).await?;
134+
delete_expired_tags(&store, prefix, false).await?;
123135
tokio::time::sleep(Duration::from_secs(5)).await;
124136
}
125137
}
126138

127139
#[tokio::main]
128140
async fn main() -> anyhow::Result<()> {
129141
tracing_subscriber::fmt::init();
130-
let endpoint = endpoint::Endpoint::builder().bind().await?;
131-
let store = iroh_blobs::store::fs::Store::load("blobs").await?;
132-
let blobs = iroh_blobs::net_protocol::Blobs::builder(store).build(&endpoint);
133-
// enable gc with a short period
134-
blobs.start_gc(GcConfig {
135-
period: Duration::from_secs(1),
136-
done_callback: None,
137-
})?;
138-
// create a router and add blobs as a service
139-
//
140-
// You can skip this if you don't want to serve the data over the network.
141-
let router = iroh::protocol::Router::builder(endpoint)
142-
.accept(iroh_blobs::ALPN, blobs.clone())
143-
.spawn()
144-
.await?;
142+
let path = std::env::current_dir()?.join("blobs");
143+
let options = Options {
144+
path: PathOptions::new(&path),
145+
gc: Some(GcConfig {
146+
interval: Duration::from_secs(10),
147+
}),
148+
inline: InlineOptions::default(),
149+
batch: BatchOptions::default(),
150+
};
151+
let store =
152+
iroh_blobs::store::fs::FsStore::load_with_opts(path.join("blobs.db"), options).await?;
145153

146154
// setup: add some data and tag it
147155
{
148156
// add several blobs and tag them with an expiry date 10 seconds in the future
149-
let batch = blobs.client().batch().await?;
157+
let batch = store.batch().await?;
150158
let a = batch.add_bytes("blob 1".as_bytes()).await?;
151159
let b = batch.add_bytes("blob 2".as_bytes()).await?;
160+
152161
let expires_at = SystemTime::now()
153162
.checked_add(Duration::from_secs(10))
154163
.unwrap();
155-
create_expiring_tag(
156-
blobs.client(),
157-
&[*a.hash(), *b.hash()],
158-
"expiring",
159-
expires_at,
160-
)
161-
.await?;
164+
create_expiring_tag(&store, &[*a.hash(), *b.hash()], "expiring", expires_at).await?;
162165

163166
// add a single blob and tag it with an expiry date 60 seconds in the future
164167
let c = batch.add_bytes("blob 3".as_bytes()).await?;
165168
let expires_at = SystemTime::now()
166169
.checked_add(Duration::from_secs(60))
167170
.unwrap();
168-
create_expiring_tag(blobs.client(), &[*c.hash()], "expiring", expires_at).await?;
171+
create_expiring_tag(&store, &[*c.hash()], "expiring", expires_at).await?;
169172
// batch goes out of scope, so data is only protected by the tags we created
170173
}
171-
let client = blobs.client().clone();
172174

173175
// delete expired tags every 5 seconds
174-
let delete_task = tokio::spawn(delete_expired_tags_task(client.clone(), "expiring"));
176+
let delete_task = tokio::spawn(delete_expired_tags_task(store.deref().clone(), "expiring"));
175177
// print all tags and blobs every 5 seconds
176-
let info_task = tokio::spawn(info_task(client.clone()));
178+
let info_task = tokio::spawn(info_task(store.deref().clone()));
177179

178180
ctrl_c().await?;
179181
delete_task.abort();
180182
info_task.abort();
181-
router.shutdown().await?;
183+
store.shutdown().await?;
182184
Ok(())
183185
}

src/api/tags.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,22 @@ impl Tags {
5858
Ok(stream.next().await.transpose()?)
5959
}
6060

61-
pub async fn set_with_opts(&self, options: SetOptions) -> super::RequestResult<()> {
61+
pub async fn set_with_opts(&self, options: SetOptions) -> super::RequestResult<TagInfo> {
6262
trace!("{:?}", options);
63+
let info = TagInfo {
64+
name: options.name.clone(),
65+
hash: options.value.hash,
66+
format: options.value.format,
67+
};
6368
self.client.rpc(options).await??;
64-
Ok(())
69+
Ok(info)
6570
}
6671

6772
pub async fn set(
6873
&self,
6974
name: impl AsRef<[u8]>,
7075
value: impl Into<HashAndFormat>,
71-
) -> super::RequestResult<()> {
76+
) -> super::RequestResult<TagInfo> {
7277
self.set_with_opts(SetOptions {
7378
name: Tag::from(name.as_ref()),
7479
value: value.into(),

src/store/fs.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,14 @@ impl Actor {
693693
fs::create_dir_all(db_path.parent().unwrap())?;
694694
let (db_send, db_recv) = tokio::sync::mpsc::channel(100);
695695
let (protect, ds) = delete_set::pair(Arc::new(options.path.clone()));
696-
let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone())?;
696+
let db_actor = meta::Actor::new(db_path, db_recv, ds, options.batch.clone());
697+
let db_actor = match db_actor {
698+
Ok(actor) => actor,
699+
Err(err) => {
700+
println!("failed to create meta actor: {err}");
701+
return Err(err);
702+
}
703+
};
697704
let slot_context = Arc::new(TaskContext {
698705
options: options.clone(),
699706
db: meta::Db::new(db_send),
@@ -1209,6 +1216,7 @@ impl FsStore {
12091216
let (commands_tx, commands_rx) = tokio::sync::mpsc::channel(100);
12101217
let (fs_commands_tx, fs_commands_rx) = tokio::sync::mpsc::channel(100);
12111218
let gc_config = options.gc.clone();
1219+
println!("Creating actor");
12121220
let actor = handle
12131221
.spawn(Actor::new(
12141222
db_path,

src/store/fs/meta.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,8 @@ impl Actor {
409409
options: BatchOptions,
410410
) -> anyhow::Result<Self> {
411411
debug!("creating or opening meta database at {}", db_path.display());
412-
let db = match redb::Database::create(db_path) {
412+
let res = redb::Database::create(&db_path);
413+
let db = match res {
413414
Ok(db) => db,
414415
Err(DatabaseError::UpgradeRequired(1)) => {
415416
return Err(anyhow::anyhow!("migration from v1 no longer supported"));

src/store/fs/options.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::{
44
time::Duration,
55
};
66

7-
use super::{gc::GcConfig, meta::raw_outboard_size, temp_name};
7+
pub use super::gc::GcConfig;
8+
use super::{meta::raw_outboard_size, temp_name};
89
use crate::Hash;
910

1011
/// Options for directories used by the file store.

0 commit comments

Comments
 (0)