Skip to content

Commit 3083bcd

Browse files
committed
require user to define state struct
1 parent 71bf60b commit 3083bcd

File tree

3 files changed

+166
-166
lines changed

3 files changed

+166
-166
lines changed

src/store/fs.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ use crate::{
117117
BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader,
118118
OutboardReader,
119119
},
120-
util::entity_manager::{self, ActiveEntityState},
120+
util::entity_manager,
121121
},
122122
util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
123123
Hash, IROH_BLOCK_SIZE,
@@ -211,21 +211,39 @@ impl TaskContext {
211211
}
212212
}
213213

214-
#[derive(Debug)]
215-
struct EmParams;
216-
217-
impl entity_manager::Params for EmParams {
214+
impl entity_manager::Params for HashContext {
218215
type EntityId = Hash;
219216

220217
type GlobalState = Arc<TaskContext>;
221218

222-
type EntityState = BaoFileHandle;
219+
fn id(&self) -> &Self::EntityId {
220+
&self.id
221+
}
222+
223+
fn global(&self) -> &Self::GlobalState {
224+
&self.global
225+
}
223226

224-
async fn on_shutdown(
225-
state: entity_manager::ActiveEntityState<Self>,
226-
_cause: entity_manager::ShutdownCause,
227-
) {
228-
state.persist().await;
227+
fn ref_count(&self) -> usize {
228+
self.state.receiver_count() + self.state.receiver_count()
229+
}
230+
231+
fn new(id: &Self::EntityId, global: &Self::GlobalState) -> Self {
232+
Self {
233+
id: *id,
234+
global: global.clone(),
235+
state: BaoFileHandle::default(),
236+
}
237+
}
238+
239+
fn reset(&mut self, id: &Self::EntityId, global: &Self::GlobalState) {
240+
self.id = *id;
241+
self.global = global.clone();
242+
self.state.send_replace(BaoFileStorage::Initial);
243+
}
244+
245+
async fn on_shutdown(&self, _cause: entity_manager::ShutdownCause) {
246+
self.persist().await;
229247
}
230248
}
231249

@@ -240,7 +258,7 @@ struct Actor {
240258
// Tasks for import and export operations.
241259
tasks: JoinSet<()>,
242260
// Entity manager that handles concurrency for entities.
243-
handles: EntityManagerState<EmParams>,
261+
handles: EntityManagerState<HashContext>,
244262
// temp tags
245263
temp_tags: TempTags,
246264
// waiters for idle state.
@@ -249,7 +267,12 @@ struct Actor {
249267
_rt: RtWrapper,
250268
}
251269

252-
type HashContext = ActiveEntityState<EmParams>;
270+
#[derive(Debug, Clone)]
271+
struct HashContext {
272+
id: Hash,
273+
global: Arc<TaskContext>,
274+
state: BaoFileHandle,
275+
}
253276

254277
impl SyncEntityApi for HashContext {
255278
/// Load the state from the database.
@@ -677,11 +700,11 @@ trait HashSpecificCommand: HashSpecific + Send + 'static {
677700

678701
/// Opportunity to send an error if spawning fails due to the task being busy (inbox full)
679702
/// or dead (e.g. panic in one of the running tasks).
680-
fn on_error(self, arg: SpawnArg<EmParams>) -> impl Future<Output = ()> + Send + 'static;
703+
fn on_error(self, arg: SpawnArg<HashContext>) -> impl Future<Output = ()> + Send + 'static;
681704

682705
async fn spawn(
683706
self,
684-
manager: &mut entity_manager::EntityManagerState<EmParams>,
707+
manager: &mut entity_manager::EntityManagerState<HashContext>,
685708
tasks: &mut JoinSet<()>,
686709
) where
687710
Self: Sized,
@@ -715,13 +738,13 @@ impl HashSpecificCommand for ObserveMsg {
715738
async fn handle(self, ctx: HashContext) {
716739
ctx.observe(self).await
717740
}
718-
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
741+
async fn on_error(self, _arg: SpawnArg<HashContext>) {}
719742
}
720743
impl HashSpecificCommand for ExportPathMsg {
721744
async fn handle(self, ctx: HashContext) {
722745
ctx.export_path(self).await
723746
}
724-
async fn on_error(self, arg: SpawnArg<EmParams>) {
747+
async fn on_error(self, arg: SpawnArg<HashContext>) {
725748
let err = match arg {
726749
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
727750
SpawnArg::Dead => io::Error::other("entity is dead"),
@@ -737,7 +760,7 @@ impl HashSpecificCommand for ExportBaoMsg {
737760
async fn handle(self, ctx: HashContext) {
738761
ctx.export_bao(self).await
739762
}
740-
async fn on_error(self, arg: SpawnArg<EmParams>) {
763+
async fn on_error(self, arg: SpawnArg<HashContext>) {
741764
let err = match arg {
742765
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
743766
SpawnArg::Dead => io::Error::other("entity is dead"),
@@ -753,7 +776,7 @@ impl HashSpecificCommand for ExportRangesMsg {
753776
async fn handle(self, ctx: HashContext) {
754777
ctx.export_ranges(self).await
755778
}
756-
async fn on_error(self, arg: SpawnArg<EmParams>) {
779+
async fn on_error(self, arg: SpawnArg<HashContext>) {
757780
let err = match arg {
758781
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
759782
SpawnArg::Dead => io::Error::other("entity is dead"),
@@ -769,7 +792,7 @@ impl HashSpecificCommand for ImportBaoMsg {
769792
async fn handle(self, ctx: HashContext) {
770793
ctx.import_bao(self).await
771794
}
772-
async fn on_error(self, arg: SpawnArg<EmParams>) {
795+
async fn on_error(self, arg: SpawnArg<HashContext>) {
773796
let err = match arg {
774797
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
775798
SpawnArg::Dead => io::Error::other("entity is dead"),
@@ -788,7 +811,7 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
788811
let (tt, cmd) = self;
789812
ctx.finish_import(cmd, tt).await
790813
}
791-
async fn on_error(self, arg: SpawnArg<EmParams>) {
814+
async fn on_error(self, arg: SpawnArg<HashContext>) {
792815
let err = match arg {
793816
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
794817
SpawnArg::Dead => io::Error::other("entity is dead"),

src/store/fs/bao_file.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use super::{
3030
use crate::{
3131
api::blobs::Bitfield,
3232
store::{
33-
fs::{meta::raw_outboard_size, util::entity_manager, HashContext},
33+
fs::{meta::raw_outboard_size, HashContext},
3434
util::{
3535
read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile,
3636
PartialMemStorage, DD,
@@ -523,16 +523,6 @@ impl BaoFileStorage {
523523
#[derive(Debug, Clone, Default, derive_more::Deref)]
524524
pub(crate) struct BaoFileHandle(pub(super) watch::Sender<BaoFileStorage>);
525525

526-
impl entity_manager::Reset for BaoFileHandle {
527-
fn reset(&mut self) {
528-
self.send_replace(BaoFileStorage::Initial);
529-
}
530-
531-
fn ref_count(&self) -> usize {
532-
self.0.receiver_count() + self.0.sender_count()
533-
}
534-
}
535-
536526
/// A reader for a bao file, reading just the data.
537527
#[derive(Debug)]
538528
pub struct DataReader(pub(super) BaoFileHandle);

0 commit comments

Comments
 (0)