From 5cadec006ef37f659876bd62002c81030816dce0 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Mon, 24 Mar 2025 20:45:21 -0400 Subject: [PATCH 01/18] completed v6 --- crates/bevy_ecs/src/entity/mod.rs | 237 +++++++++++++++++++++++++++++- 1 file changed, 234 insertions(+), 3 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 2756648e94218..40d88b12b709c 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -45,6 +45,7 @@ use bevy_reflect::Reflect; use bevy_reflect::{ReflectDeserialize, ReflectSerialize}; pub use clone_entities::*; +use concurrent_queue::ConcurrentQueue; pub use entity_set::*; pub use map_entities::*; @@ -73,8 +74,18 @@ use crate::{ storage::{SparseSetIndex, TableId, TableRow}, }; use alloc::vec::Vec; -use bevy_platform_support::sync::atomic::Ordering; -use core::{fmt, hash::Hash, mem, num::NonZero, panic::Location}; +use bevy_platform_support::sync::{ + atomic::{AtomicU32, Ordering}, + Arc, +}; +use core::{ + fmt, + hash::Hash, + mem, + num::NonZero, + panic::Location, + task::{Poll, Waker}, +}; use log::warn; #[cfg(feature = "serialize")] @@ -515,6 +526,159 @@ impl<'a> core::iter::FusedIterator for ReserveEntitiesIterator<'a> {} // SAFETY: Newly reserved entity values are unique. unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} +#[derive(Debug)] +struct RemoteEntityRequest(Waker); + +#[derive(Debug)] +struct RemoteEntitiesInner { + /// This is purely an optimization. + /// When this is greater than 0, we are waiting on request fulfillment. + is_waiting: AtomicU32, + /// The number of times requests have been fulfilled. + generation: AtomicU32, + /// Since anyone can make requests at any time, this must be unbounded. + requests: ConcurrentQueue, + reserved: ConcurrentQueue, +} + +/// An error that occurs when an [`Entity`] can not be reserved remotely. +/// See also [`RemoteEntities`]. +#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)] +pub enum RemoteReservationError { + /// This happens when [`Entities`] are closed, dropped, etc while a [`RemoteEntitiesReserver`] is trying to reserve from it. + #[error("A remote entity reserver tried to reserve an entity from a closed `Entities`.")] + Closed, +} + +struct RemoteReservedEntities { + source: Arc, + requested_on_generation: Option, +} + +impl Future for RemoteReservedEntities { + type Output = Result; + + fn poll( + mut self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> Poll { + match self.source.reserved.pop() { + Ok(found) => Poll::Ready(Ok(found)), + Err(concurrent_queue::PopError::Closed) => { + Poll::Ready(Err(RemoteReservationError::Closed)) + } + Err(concurrent_queue::PopError::Empty) => { + let current_generation = self.source.generation.load(Ordering::Acquire); + if self + .requested_on_generation + .is_none_or(|generation| generation != current_generation) + { + self.requested_on_generation = self + .source + .requests + .push(RemoteEntityRequest(cx.waker().clone())) + .is_ok() + .then_some(current_generation); + } + + // We can use relaxed ordering here. Even though it might result in extra waiting, who cares? + self.source.is_waiting.fetch_add(1, Ordering::Relaxed); + // Now we are waiting for a flush + Poll::Pending + } + } + } +} + +impl RemoteEntitiesInner { + fn fulfill(&self, entities: &Entities, batch_size: NonZero) { + self.is_waiting.store(0, Ordering::Relaxed); + for reserved in entities.reserve_entities(batch_size.get()) { + self.reserved + .push(reserved) + .expect("This never closes, and it can't run out of room."); + } + + self.generation.fetch_add(1, Ordering::AcqRel); + for request in self.requests.try_iter() { + request.0.wake(); + } + } + + #[inline] + fn try_fulfill(&self, entities: &Entities, batch_size: NonZero) { + // we do this to hint to the compiler that the if branch is unlinkely to be taken. + #[cold] + fn do_fulfill(this: &RemoteEntitiesInner, entities: &Entities, batch_size: NonZero) { + this.fulfill(entities, batch_size); + } + + if self.is_waiting.load(Ordering::Relaxed) > 0 { + // TODO: add core::intrinsics::unlikely once stable + do_fulfill(self, entities, batch_size); + } + } + + fn new() -> Self { + Self { + is_waiting: AtomicU32::new(0), + generation: AtomicU32::new(0), + requests: ConcurrentQueue::unbounded(), + reserved: ConcurrentQueue::unbounded(), + } + } + + fn close(&self) { + self.reserved.close(); + self.requests.close(); + } +} + +/// Manages access to [`Entities`] from any thread and async. +pub struct RemoteEntities { + inner: Arc, +} + +impl RemoteEntities { + /// Constructs a [`RemoteEntitiesReserver`] that can be shared between threads. + /// + /// # Example + /// + /// ``` + /// use bevy_ecs::prelude::*; + /// + /// let mut world = World::new(); + /// let remote = world.entities().get_remote(); + /// + /// // The reserve is async so we need it to be on a separate thread. + /// let thread = std::thread::spawn(move || { + /// let future = async { + /// for _ in 0..100 { + /// reserver.reserve_entity().await.unwrap(); + /// } + /// }; + /// bevy_tasks::block_on(future); + /// }); + /// + /// // We need to flush the entities as needed or the remote entities will get stuck. + /// while !thread.is_finished() { + /// world.flush(); + /// } + /// ``` + pub async fn reserve_entity(&self) -> Result { + RemoteReservedEntities { + source: self.inner.clone(), + requested_on_generation: None, + } + .await + } + + /// Returns true only if the [`Entities`] has discontinued this remote access. + pub fn is_closed(&self) -> bool { + self.inner.reserved.is_closed() + } +} + /// A [`World`]'s internal metadata store on all of its entities. /// /// Contains metadata on: @@ -525,6 +689,10 @@ unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} /// [`World`]: crate::world::World #[derive(Debug)] pub struct Entities { + /// This is the number of entities we reserve in bulk for [`RemoteEntities`]. + /// A value too high can cause excess memory to be used, but a value too low can cause additional waiting. + pub remote_batch_size: NonZero, + remote: Arc, meta: Vec, /// The `pending` and `free_cursor` fields describe three sets of Entity IDs @@ -572,11 +740,23 @@ pub struct Entities { } impl Entities { - pub(crate) const fn new() -> Self { + /// The default value of [`remote_batch_size`](Self::remote_batch_size). + pub const DEFAULT_REMOTE_BATCH_SIZE: NonZero = NonZero::new(1024).unwrap(); + + pub(crate) fn new() -> Self { Entities { meta: Vec::new(), pending: Vec::new(), free_cursor: AtomicIdCursor::new(0), + remote: Arc::new(RemoteEntitiesInner::new()), + remote_batch_size: Self::DEFAULT_REMOTE_BATCH_SIZE, + } + } + + /// Constructs a new [`RemoteEntities`] for this instance. + pub fn get_remote(&self) -> RemoteEntities { + RemoteEntities { + inner: self.remote.clone(), } } @@ -821,6 +1001,8 @@ impl Entities { self.meta.clear(); self.pending.clear(); *self.free_cursor.get_mut() = 0; + self.remote.close(); + self.remote = Arc::new(RemoteEntitiesInner::new()); } /// Returns the location of an [`Entity`]. @@ -907,6 +1089,9 @@ impl Entities { /// Note: freshly-allocated entities (ones which don't come from the pending list) are guaranteed /// to be initialized with the invalid archetype. pub unsafe fn flush(&mut self, mut init: impl FnMut(Entity, &mut EntityLocation)) { + // this may do extra reservations, so we do it before flushing. + self.remote.try_fulfill(self, self.remote_batch_size); + let free_cursor = self.free_cursor.get_mut(); let current_free_cursor = *free_cursor; @@ -1035,6 +1220,13 @@ impl Entities { } } +impl Drop for Entities { + fn drop(&mut self) { + // Make sure remote entities are informed. + self.remote.close(); + } +} + /// An error that occurs when a specified [`Entity`] does not exist. #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)] #[error("The entity with ID {entity} {details}")] @@ -1217,6 +1409,45 @@ mod tests { assert!(next_entity.generation() > entity.generation() + GENERATIONS); } + #[test] + #[cfg(feature = "std")] + fn remote_reservation() { + use std::thread; + + use bevy_tasks::block_on; + + let mut entities = Entities::new(); + + let mut threads = (0..3) + .map(|_| { + let reserver = entities.get_remote(); + thread::spawn(move || { + let future = async { + for _ in 0..100 { + reserver.reserve_entity().await.unwrap(); + } + }; + block_on(future); + }) + }) + .collect::>(); + + let timeout = std::time::Instant::now(); + loop { + threads.retain(|thread| !thread.is_finished()); + entities.flush_as_invalid(); + if threads.is_empty() { + break; + } + if timeout.elapsed().as_secs() > 60 { + panic!("remote entities timmed out.") + } + } + + // It might be a little over since we may have reserved extra entities for remote reservation. + assert!(entities.len() >= 300); + } + #[test] #[expect( clippy::nonminimal_bool, From e2717862cfc2cf8c9e5e5305300ee8b251d5a5a6 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Mon, 24 Mar 2025 20:50:59 -0400 Subject: [PATCH 02/18] minor improvements --- crates/bevy_ecs/src/entity/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 40d88b12b709c..8b7bf5671ed4b 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -635,6 +635,7 @@ impl RemoteEntitiesInner { } /// Manages access to [`Entities`] from any thread and async. +#[derive(Clone)] pub struct RemoteEntities { inner: Arc, } @@ -654,7 +655,7 @@ impl RemoteEntities { /// let thread = std::thread::spawn(move || { /// let future = async { /// for _ in 0..100 { - /// reserver.reserve_entity().await.unwrap(); + /// remote.reserve_entity().await.unwrap(); /// } /// }; /// bevy_tasks::block_on(future); From a7124d0802ef5683b17665fe8741c5fe257201b2 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Mon, 24 Mar 2025 21:14:51 -0400 Subject: [PATCH 03/18] improved test and fixed deadlock --- crates/bevy_ecs/src/entity/mod.rs | 51 +++++++++++++++---------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 8b7bf5671ed4b..69e18463ae7f4 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -534,8 +534,6 @@ struct RemoteEntitiesInner { /// This is purely an optimization. /// When this is greater than 0, we are waiting on request fulfillment. is_waiting: AtomicU32, - /// The number of times requests have been fulfilled. - generation: AtomicU32, /// Since anyone can make requests at any time, this must be unbounded. requests: ConcurrentQueue, reserved: ConcurrentQueue, @@ -552,14 +550,13 @@ pub enum RemoteReservationError { struct RemoteReservedEntities { source: Arc, - requested_on_generation: Option, } impl Future for RemoteReservedEntities { type Output = Result; fn poll( - mut self: core::pin::Pin<&mut Self>, + self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> Poll { match self.source.reserved.pop() { @@ -568,23 +565,24 @@ impl Future for RemoteReservedEntities { Poll::Ready(Err(RemoteReservationError::Closed)) } Err(concurrent_queue::PopError::Empty) => { - let current_generation = self.source.generation.load(Ordering::Acquire); - if self - .requested_on_generation - .is_none_or(|generation| generation != current_generation) + match self + .source + .requests + .push(RemoteEntityRequest(cx.waker().clone())) { - self.requested_on_generation = self - .source - .requests - .push(RemoteEntityRequest(cx.waker().clone())) - .is_ok() - .then_some(current_generation); + Ok(_) => { + // We can use relaxed ordering here. Even though it might result in extra waiting, who cares? + self.source.is_waiting.fetch_add(1, Ordering::Relaxed); + // Now we are waiting for a flush + Poll::Pending + } + Err(concurrent_queue::PushError::Closed(_)) => { + Poll::Ready(Err(RemoteReservationError::Closed)) + } + Err(concurrent_queue::PushError::Full(_)) => { + unreachable!("Requests can't be full") + } } - - // We can use relaxed ordering here. Even though it might result in extra waiting, who cares? - self.source.is_waiting.fetch_add(1, Ordering::Relaxed); - // Now we are waiting for a flush - Poll::Pending } } } @@ -593,13 +591,14 @@ impl Future for RemoteReservedEntities { impl RemoteEntitiesInner { fn fulfill(&self, entities: &Entities, batch_size: NonZero) { self.is_waiting.store(0, Ordering::Relaxed); - for reserved in entities.reserve_entities(batch_size.get()) { - self.reserved - .push(reserved) - .expect("This never closes, and it can't run out of room."); + if self.reserved.is_empty() { + for reserved in entities.reserve_entities(batch_size.get()) { + self.reserved + .push(reserved) + .expect("This never closes, and it can't run out of room."); + } } - self.generation.fetch_add(1, Ordering::AcqRel); for request in self.requests.try_iter() { request.0.wake(); } @@ -622,7 +621,6 @@ impl RemoteEntitiesInner { fn new() -> Self { Self { is_waiting: AtomicU32::new(0), - generation: AtomicU32::new(0), requests: ConcurrentQueue::unbounded(), reserved: ConcurrentQueue::unbounded(), } @@ -669,7 +667,6 @@ impl RemoteEntities { pub async fn reserve_entity(&self) -> Result { RemoteReservedEntities { source: self.inner.clone(), - requested_on_generation: None, } .await } @@ -1418,6 +1415,8 @@ mod tests { use bevy_tasks::block_on; let mut entities = Entities::new(); + // Lower batch size so more waiting is tested. + entities.remote_batch_size = NonZero::new(16).unwrap(); let mut threads = (0..3) .map(|_| { From cdd00ced8856138930d8d761ae18e83267c19342 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Mon, 24 Mar 2025 21:36:52 -0400 Subject: [PATCH 04/18] fix docs --- crates/bevy_ecs/src/entity/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 69e18463ae7f4..01a8ade4fc6a2 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -543,7 +543,7 @@ struct RemoteEntitiesInner { /// See also [`RemoteEntities`]. #[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)] pub enum RemoteReservationError { - /// This happens when [`Entities`] are closed, dropped, etc while a [`RemoteEntitiesReserver`] is trying to reserve from it. + /// This happens when [`Entities`] are closed, dropped, etc while a [`RemoteEntities`] is trying to reserve from it. #[error("A remote entity reserver tried to reserve an entity from a closed `Entities`.")] Closed, } @@ -639,7 +639,7 @@ pub struct RemoteEntities { } impl RemoteEntities { - /// Constructs a [`RemoteEntitiesReserver`] that can be shared between threads. + /// Reserves an [`Entity`] from async. /// /// # Example /// From db53dda6220335482d14569354aba4eb541130a8 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Wed, 26 Mar 2025 12:23:05 -0400 Subject: [PATCH 05/18] switch to async channel --- crates/bevy_ecs/Cargo.toml | 4 +- crates/bevy_ecs/src/entity/mod.rs | 156 ++++++++++++------------------ 2 files changed, 64 insertions(+), 96 deletions(-) diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index c5350dd91ad17..05291421cb0a4 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -34,7 +34,7 @@ bevy_reflect = ["dep:bevy_reflect"] reflect_functions = ["bevy_reflect", "bevy_reflect/functions"] ## Use the configurable global error handler as the default error handler. -## +## ## This is typically used to turn panics from the ECS into loggable errors. ## This may be useful for production builds, ## but can result in a measurable performance impact, especially for commands. @@ -85,6 +85,7 @@ std = [ "arrayvec?/std", "log/std", "bevy_platform_support/std", + "async-channel/std", ] ## `critical-section` provides the building blocks for synchronization primitives @@ -132,6 +133,7 @@ variadics_please = { version = "1.1", default-features = false } tracing = { version = "0.1", default-features = false, optional = true } log = { version = "0.4", default-features = false } bumpalo = "3" +async-channel = { version = "2.3", default-features = false } [target.'cfg(not(all(target_has_atomic = "8", target_has_atomic = "16", target_has_atomic = "32", target_has_atomic = "64", target_has_atomic = "ptr")))'.dependencies] concurrent-queue = { version = "2.5.0", default-features = false, features = [ diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 01a8ade4fc6a2..79db7a7b52b84 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -45,7 +45,6 @@ use bevy_reflect::Reflect; use bevy_reflect::{ReflectDeserialize, ReflectSerialize}; pub use clone_entities::*; -use concurrent_queue::ConcurrentQueue; pub use entity_set::*; pub use map_entities::*; @@ -78,14 +77,7 @@ use bevy_platform_support::sync::{ atomic::{AtomicU32, Ordering}, Arc, }; -use core::{ - fmt, - hash::Hash, - mem, - num::NonZero, - panic::Location, - task::{Poll, Waker}, -}; +use core::{fmt, hash::Hash, mem, num::NonZero, panic::Location}; use log::warn; #[cfg(feature = "serialize")] @@ -526,17 +518,12 @@ impl<'a> core::iter::FusedIterator for ReserveEntitiesIterator<'a> {} // SAFETY: Newly reserved entity values are unique. unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} -#[derive(Debug)] -struct RemoteEntityRequest(Waker); - #[derive(Debug)] struct RemoteEntitiesInner { - /// This is purely an optimization. - /// When this is greater than 0, we are waiting on request fulfillment. - is_waiting: AtomicU32, - /// Since anyone can make requests at any time, this must be unbounded. - requests: ConcurrentQueue, - reserved: ConcurrentQueue, + recent_requests: AtomicU32, + keep_hot: AtomicU32, + reserved: async_channel::Receiver, + reserver: async_channel::Sender, } /// An error that occurs when an [`Entity`] can not be reserved remotely. @@ -548,87 +535,65 @@ pub enum RemoteReservationError { Closed, } -struct RemoteReservedEntities { - source: Arc, -} - -impl Future for RemoteReservedEntities { - type Output = Result; - - fn poll( - self: core::pin::Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> Poll { - match self.source.reserved.pop() { - Ok(found) => Poll::Ready(Ok(found)), - Err(concurrent_queue::PopError::Closed) => { - Poll::Ready(Err(RemoteReservationError::Closed)) - } - Err(concurrent_queue::PopError::Empty) => { - match self - .source - .requests - .push(RemoteEntityRequest(cx.waker().clone())) - { - Ok(_) => { - // We can use relaxed ordering here. Even though it might result in extra waiting, who cares? - self.source.is_waiting.fetch_add(1, Ordering::Relaxed); - // Now we are waiting for a flush - Poll::Pending - } - Err(concurrent_queue::PushError::Closed(_)) => { - Poll::Ready(Err(RemoteReservationError::Closed)) - } - Err(concurrent_queue::PushError::Full(_)) => { - unreachable!("Requests can't be full") - } - } - } - } - } -} - impl RemoteEntitiesInner { - fn fulfill(&self, entities: &Entities, batch_size: NonZero) { - self.is_waiting.store(0, Ordering::Relaxed); - if self.reserved.is_empty() { - for reserved in entities.reserve_entities(batch_size.get()) { - self.reserved - .push(reserved) - .expect("This never closes, and it can't run out of room."); - } - } - - for request in self.requests.try_iter() { - request.0.wake(); + #[inline] + fn fulfill( + entities: &mut Entities, + mut reserve_allocated: impl FnMut(Entity, &mut EntityLocation), + keep_hot: u32, + ) { + let to_fulfill = entities.remote.recent_requests.swap(0, Ordering::Relaxed); + + for _ in 0..to_fulfill { + let entity = entities.alloc(); + // SAFETY: we just allocated it + let loc = unsafe { + &mut entities + .meta + .get_unchecked_mut(entity.index() as usize) + .location + }; + reserve_allocated(entity, loc); + let result = entities.remote.reserver.try_send(entity); + // It should not be closed and it can't get full. + debug_assert!(result.is_ok()); } } #[inline] - fn try_fulfill(&self, entities: &Entities, batch_size: NonZero) { + fn try_fulfill( + entities: &mut Entities, + reserve_allocated: impl FnMut(Entity, &mut EntityLocation), + keep_hot: u32, + ) { // we do this to hint to the compiler that the if branch is unlinkely to be taken. #[cold] - fn do_fulfill(this: &RemoteEntitiesInner, entities: &Entities, batch_size: NonZero) { - this.fulfill(entities, batch_size); + fn do_fulfill( + entities: &mut Entities, + reserve_allocated: impl FnMut(Entity, &mut EntityLocation), + keep_hot: u32, + ) { + RemoteEntitiesInner::fulfill(entities, reserve_allocated, keep_hot); } - if self.is_waiting.load(Ordering::Relaxed) > 0 { + if entities.remote.recent_requests.load(Ordering::Relaxed) > 0 { // TODO: add core::intrinsics::unlikely once stable - do_fulfill(self, entities, batch_size); + do_fulfill(entities, reserve_allocated, keep_hot); } } - fn new() -> Self { + fn new(keep_hot: u32) -> Self { + let (sender, receiver) = async_channel::unbounded(); Self { - is_waiting: AtomicU32::new(0), - requests: ConcurrentQueue::unbounded(), - reserved: ConcurrentQueue::unbounded(), + recent_requests: AtomicU32::new(0), + reserver: sender, + reserved: receiver, + keep_hot: AtomicU32::new(keep_hot), } } fn close(&self) { self.reserved.close(); - self.requests.close(); } } @@ -665,10 +630,12 @@ impl RemoteEntities { /// } /// ``` pub async fn reserve_entity(&self) -> Result { - RemoteReservedEntities { - source: self.inner.clone(), - } - .await + self.inner.recent_requests.fetch_add(1, Ordering::Relaxed); + self.inner + .reserved + .recv() + .await + .map_err(|_| RemoteReservationError::Closed) } /// Returns true only if the [`Entities`] has discontinued this remote access. @@ -687,9 +654,9 @@ impl RemoteEntities { /// [`World`]: crate::world::World #[derive(Debug)] pub struct Entities { - /// This is the number of entities we reserve in bulk for [`RemoteEntities`]. + /// This is the number of entities we keep ready for remote reservations via [`RemoteEntities::reserve_entity`]. /// A value too high can cause excess memory to be used, but a value too low can cause additional waiting. - pub remote_batch_size: NonZero, + pub entities_hot_for_remote: u32, remote: Arc, meta: Vec, @@ -738,16 +705,16 @@ pub struct Entities { } impl Entities { - /// The default value of [`remote_batch_size`](Self::remote_batch_size). - pub const DEFAULT_REMOTE_BATCH_SIZE: NonZero = NonZero::new(1024).unwrap(); + /// The default value of [`entities_hot_for_remote`](Self::entities_hot_for_remote). + pub const DEFAULT_HOT_REMOTE_ENTITIES: u32 = 256; pub(crate) fn new() -> Self { Entities { meta: Vec::new(), pending: Vec::new(), free_cursor: AtomicIdCursor::new(0), - remote: Arc::new(RemoteEntitiesInner::new()), - remote_batch_size: Self::DEFAULT_REMOTE_BATCH_SIZE, + remote: Arc::new(RemoteEntitiesInner::new(Self::DEFAULT_HOT_REMOTE_ENTITIES)), + entities_hot_for_remote: Self::DEFAULT_HOT_REMOTE_ENTITIES, } } @@ -1000,7 +967,7 @@ impl Entities { self.pending.clear(); *self.free_cursor.get_mut() = 0; self.remote.close(); - self.remote = Arc::new(RemoteEntitiesInner::new()); + self.remote = Arc::new(RemoteEntitiesInner::new(self.entities_hot_for_remote)); } /// Returns the location of an [`Entity`]. @@ -1087,9 +1054,6 @@ impl Entities { /// Note: freshly-allocated entities (ones which don't come from the pending list) are guaranteed /// to be initialized with the invalid archetype. pub unsafe fn flush(&mut self, mut init: impl FnMut(Entity, &mut EntityLocation)) { - // this may do extra reservations, so we do it before flushing. - self.remote.try_fulfill(self, self.remote_batch_size); - let free_cursor = self.free_cursor.get_mut(); let current_free_cursor = *free_cursor; @@ -1117,6 +1081,8 @@ impl Entities { &mut meta.location, ); } + + RemoteEntitiesInner::try_fulfill(self, init, self.entities_hot_for_remote); } /// Flushes all reserved entities to an "invalid" state. Attempting to retrieve them will return `None` @@ -1416,7 +1382,7 @@ mod tests { let mut entities = Entities::new(); // Lower batch size so more waiting is tested. - entities.remote_batch_size = NonZero::new(16).unwrap(); + entities.entities_hot_for_remote = 16; let mut threads = (0..3) .map(|_| { From 8860106a6459a79a95014da25da483d015114a11 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Wed, 26 Mar 2025 12:25:53 -0400 Subject: [PATCH 06/18] improved tests --- crates/bevy_ecs/src/entity/mod.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 79db7a7b52b84..592ac3f744cce 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -1373,14 +1373,11 @@ mod tests { assert!(next_entity.generation() > entity.generation() + GENERATIONS); } - #[test] #[cfg(feature = "std")] - fn remote_reservation() { - use std::thread; - + fn test_remote_reservation(entities: &mut Entities) { use bevy_tasks::block_on; + use std::thread; - let mut entities = Entities::new(); // Lower batch size so more waiting is tested. entities.entities_hot_for_remote = 16; @@ -1414,6 +1411,23 @@ mod tests { assert!(entities.len() >= 300); } + #[test] + #[cfg(feature = "std")] + fn remote_reservation() { + let mut entities = Entities::new(); + // Lower batch size so more waiting is tested. + entities.entities_hot_for_remote = 16; + test_remote_reservation(&mut entities); + // Lower batch size so more waiting is tested. + entities.entities_hot_for_remote = 1024; + test_remote_reservation(&mut entities); + + // Try 0 + let mut entities = Entities::new(); + entities.entities_hot_for_remote = 0; + test_remote_reservation(&mut entities); + } + #[test] #[expect( clippy::nonminimal_bool, From b7fd02d65a80a4d7435d4bbc1c196949843fd80c Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Wed, 26 Mar 2025 12:40:37 -0400 Subject: [PATCH 07/18] keep some hot --- crates/bevy_ecs/src/entity/mod.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 592ac3f744cce..90ea6be4d34b0 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -543,8 +543,12 @@ impl RemoteEntitiesInner { keep_hot: u32, ) { let to_fulfill = entities.remote.recent_requests.swap(0, Ordering::Relaxed); + let current_hot = entities.remote.keep_hot.load(Ordering::Relaxed); + let should_reserve = (to_fulfill + keep_hot).saturating_sub(current_hot); // should_reserve = to_fulfill + (keep_hot - cuurent_hot) + let new_hot = (current_hot + should_reserve).saturating_sub(to_fulfill); // new_hot = current_hot + (should_reserve - to_fulfill). + entities.remote.keep_hot.store(new_hot, Ordering::Relaxed); - for _ in 0..to_fulfill { + for _ in 0..should_reserve { let entity = entities.alloc(); // SAFETY: we just allocated it let loc = unsafe { @@ -582,13 +586,13 @@ impl RemoteEntitiesInner { } } - fn new(keep_hot: u32) -> Self { + fn new() -> Self { let (sender, receiver) = async_channel::unbounded(); Self { recent_requests: AtomicU32::new(0), reserver: sender, reserved: receiver, - keep_hot: AtomicU32::new(keep_hot), + keep_hot: AtomicU32::new(0), } } @@ -713,7 +717,7 @@ impl Entities { meta: Vec::new(), pending: Vec::new(), free_cursor: AtomicIdCursor::new(0), - remote: Arc::new(RemoteEntitiesInner::new(Self::DEFAULT_HOT_REMOTE_ENTITIES)), + remote: Arc::new(RemoteEntitiesInner::new()), entities_hot_for_remote: Self::DEFAULT_HOT_REMOTE_ENTITIES, } } @@ -967,7 +971,7 @@ impl Entities { self.pending.clear(); *self.free_cursor.get_mut() = 0; self.remote.close(); - self.remote = Arc::new(RemoteEntitiesInner::new(self.entities_hot_for_remote)); + self.remote = Arc::new(RemoteEntitiesInner::new()); } /// Returns the location of an [`Entity`]. From db84c5a73856da7bfd3079e14047591f8deea80e Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Wed, 26 Mar 2025 12:55:32 -0400 Subject: [PATCH 08/18] improve test --- crates/bevy_ecs/src/entity/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 90ea6be4d34b0..99672563c63eb 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -1380,10 +1380,10 @@ mod tests { #[cfg(feature = "std")] fn test_remote_reservation(entities: &mut Entities) { use bevy_tasks::block_on; + use rand::{rngs::StdRng, Rng, SeedableRng}; use std::thread; - // Lower batch size so more waiting is tested. - entities.entities_hot_for_remote = 16; + let mut rng = StdRng::seed_from_u64(89274528); let mut threads = (0..3) .map(|_| { @@ -1403,6 +1403,7 @@ mod tests { loop { threads.retain(|thread| !thread.is_finished()); entities.flush_as_invalid(); + entities.entities_hot_for_remote = rng.r#gen::() & 127; if threads.is_empty() { break; } @@ -1413,6 +1414,10 @@ mod tests { // It might be a little over since we may have reserved extra entities for remote reservation. assert!(entities.len() >= 300); + assert_eq!( + entities.remote.keep_hot.load(Ordering::Relaxed), + entities.remote.reserved.len() as u32 + ); } #[test] From 74de9c07912ba096bb5cd0bbd4eeece1f727a749 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Wed, 26 Mar 2025 14:19:50 -0400 Subject: [PATCH 09/18] fix no_std compile maybe --- crates/bevy_ecs/Cargo.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index 05291421cb0a4..f638619883f62 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -140,6 +140,11 @@ concurrent-queue = { version = "2.5.0", default-features = false, features = [ "portable-atomic", ] } +# This is an unused dependency, but it adds the feature to a dependency of async-channel for no_std support +event-listener = { version = "5", default-features = false, features = [ + "portable-atomic", +] } + [dev-dependencies] rand = "0.8" static_assertions = "1.1.0" From 8f9f544f3a3fbe4ee783dcd113f93b17f05d8ef7 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Thu, 27 Mar 2025 09:48:12 -0400 Subject: [PATCH 10/18] improved tests --- crates/bevy_ecs/src/entity/mod.rs | 40 +++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 99672563c63eb..62dbf20d7b585 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -1378,7 +1378,7 @@ mod tests { } #[cfg(feature = "std")] - fn test_remote_reservation(entities: &mut Entities) { + fn test_remote_reservation(entities: &mut Entities) { use bevy_tasks::block_on; use rand::{rngs::StdRng, Rng, SeedableRng}; use std::thread; @@ -1403,7 +1403,9 @@ mod tests { loop { threads.retain(|thread| !thread.is_finished()); entities.flush_as_invalid(); - entities.entities_hot_for_remote = rng.r#gen::() & 127; + if RAND { + entities.entities_hot_for_remote = rng.r#gen::() & 127; + } if threads.is_empty() { break; } @@ -1412,8 +1414,6 @@ mod tests { } } - // It might be a little over since we may have reserved extra entities for remote reservation. - assert!(entities.len() >= 300); assert_eq!( entities.remote.keep_hot.load(Ordering::Relaxed), entities.remote.reserved.len() as u32 @@ -1422,19 +1422,39 @@ mod tests { #[test] #[cfg(feature = "std")] - fn remote_reservation() { + fn remote_reservation_empty() { + let mut entities = Entities::new(); + entities.entities_hot_for_remote = 0; + test_remote_reservation::(&mut entities); + } + + #[test] + #[cfg(feature = "std")] + fn remote_reservation_standard() { let mut entities = Entities::new(); // Lower batch size so more waiting is tested. entities.entities_hot_for_remote = 16; - test_remote_reservation(&mut entities); + test_remote_reservation::(&mut entities); + } + + #[test] + #[cfg(feature = "std")] + fn remote_reservation_volitol() { + let mut entities = Entities::new(); + + entities.entities_hot_for_remote = 16; + test_remote_reservation::(&mut entities); + test_remote_reservation::(&mut entities); + // Lower batch size so more waiting is tested. entities.entities_hot_for_remote = 1024; - test_remote_reservation(&mut entities); + test_remote_reservation::(&mut entities); + test_remote_reservation::(&mut entities); - // Try 0 - let mut entities = Entities::new(); + entities.entities_hot_for_remote = 1024; + test_remote_reservation::(&mut entities); entities.entities_hot_for_remote = 0; - test_remote_reservation(&mut entities); + test_remote_reservation::(&mut entities); } #[test] From 75844c0748b634c3ec792c70bfcf73a88f16992b Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Thu, 27 Mar 2025 09:52:28 -0400 Subject: [PATCH 11/18] improved naming --- crates/bevy_ecs/src/entity/mod.rs | 37 +++++++++++++++++-------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 62dbf20d7b585..395a5a9f7cd20 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -521,7 +521,7 @@ unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} #[derive(Debug)] struct RemoteEntitiesInner { recent_requests: AtomicU32, - keep_hot: AtomicU32, + in_channel: AtomicU32, reserved: async_channel::Receiver, reserver: async_channel::Sender, } @@ -539,14 +539,17 @@ impl RemoteEntitiesInner { #[inline] fn fulfill( entities: &mut Entities, - mut reserve_allocated: impl FnMut(Entity, &mut EntityLocation), - keep_hot: u32, + mut init_allocated: impl FnMut(Entity, &mut EntityLocation), + in_channel: u32, ) { let to_fulfill = entities.remote.recent_requests.swap(0, Ordering::Relaxed); - let current_hot = entities.remote.keep_hot.load(Ordering::Relaxed); - let should_reserve = (to_fulfill + keep_hot).saturating_sub(current_hot); // should_reserve = to_fulfill + (keep_hot - cuurent_hot) - let new_hot = (current_hot + should_reserve).saturating_sub(to_fulfill); // new_hot = current_hot + (should_reserve - to_fulfill). - entities.remote.keep_hot.store(new_hot, Ordering::Relaxed); + let current_in_channel = entities.remote.in_channel.load(Ordering::Relaxed); + let should_reserve = (to_fulfill + in_channel).saturating_sub(current_in_channel); // should_reserve = to_fulfill + (in_channel - current_in_channel) + let new_in_channel = (current_in_channel + should_reserve).saturating_sub(to_fulfill); // new_in_channel = current_in_channel + (should_reserve - to_fulfill). + entities + .remote + .in_channel + .store(new_in_channel, Ordering::Relaxed); for _ in 0..should_reserve { let entity = entities.alloc(); @@ -557,7 +560,7 @@ impl RemoteEntitiesInner { .get_unchecked_mut(entity.index() as usize) .location }; - reserve_allocated(entity, loc); + init_allocated(entity, loc); let result = entities.remote.reserver.try_send(entity); // It should not be closed and it can't get full. debug_assert!(result.is_ok()); @@ -567,22 +570,22 @@ impl RemoteEntitiesInner { #[inline] fn try_fulfill( entities: &mut Entities, - reserve_allocated: impl FnMut(Entity, &mut EntityLocation), - keep_hot: u32, + init_allocated: impl FnMut(Entity, &mut EntityLocation), + in_channel: u32, ) { // we do this to hint to the compiler that the if branch is unlinkely to be taken. #[cold] fn do_fulfill( entities: &mut Entities, - reserve_allocated: impl FnMut(Entity, &mut EntityLocation), - keep_hot: u32, + init_allocated: impl FnMut(Entity, &mut EntityLocation), + in_channel: u32, ) { - RemoteEntitiesInner::fulfill(entities, reserve_allocated, keep_hot); + RemoteEntitiesInner::fulfill(entities, init_allocated, in_channel); } if entities.remote.recent_requests.load(Ordering::Relaxed) > 0 { // TODO: add core::intrinsics::unlikely once stable - do_fulfill(entities, reserve_allocated, keep_hot); + do_fulfill(entities, init_allocated, in_channel); } } @@ -592,7 +595,7 @@ impl RemoteEntitiesInner { recent_requests: AtomicU32::new(0), reserver: sender, reserved: receiver, - keep_hot: AtomicU32::new(0), + in_channel: AtomicU32::new(0), } } @@ -1410,12 +1413,12 @@ mod tests { break; } if timeout.elapsed().as_secs() > 60 { - panic!("remote entities timmed out.") + panic!("remote entities timed out.") } } assert_eq!( - entities.remote.keep_hot.load(Ordering::Relaxed), + entities.remote.in_channel.load(Ordering::Relaxed), entities.remote.reserved.len() as u32 ); } From 51e115b620c9b08291771648eaec6a231e4379c4 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Thu, 27 Mar 2025 09:55:01 -0400 Subject: [PATCH 12/18] comments on `RemoteEntitiesInner` --- crates/bevy_ecs/src/entity/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 395a5a9f7cd20..eb43e42bd4995 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -518,10 +518,15 @@ impl<'a> core::iter::FusedIterator for ReserveEntitiesIterator<'a> {} // SAFETY: Newly reserved entity values are unique. unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} +/// This is the shared data behind [`RemoteEntities`]. #[derive(Debug)] struct RemoteEntitiesInner { + /// The number of requests that have been made since the last [`fulfill`](Self::fulfill). recent_requests: AtomicU32, + /// The number of entities we're trying to keep in the channel for low latency access. in_channel: AtomicU32, + + // Channels for sending and receiving reserved entities. reserved: async_channel::Receiver, reserver: async_channel::Sender, } From 5ec8f4d7570dcb22f6f94a10d79559d3b4145832 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Thu, 27 Mar 2025 10:00:14 -0400 Subject: [PATCH 13/18] better test names --- crates/bevy_ecs/src/entity/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index eb43e42bd4995..0a5cdf97a3c4b 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -1430,7 +1430,7 @@ mod tests { #[test] #[cfg(feature = "std")] - fn remote_reservation_empty() { + fn remote_reservation_empty_hot() { let mut entities = Entities::new(); entities.entities_hot_for_remote = 0; test_remote_reservation::(&mut entities); @@ -1438,7 +1438,7 @@ mod tests { #[test] #[cfg(feature = "std")] - fn remote_reservation_standard() { + fn remote_reservation_standard_hot() { let mut entities = Entities::new(); // Lower batch size so more waiting is tested. entities.entities_hot_for_remote = 16; @@ -1447,7 +1447,7 @@ mod tests { #[test] #[cfg(feature = "std")] - fn remote_reservation_volitol() { + fn remote_reservation_volitol_hot() { let mut entities = Entities::new(); entities.entities_hot_for_remote = 16; From af387566e2fe6e182ee2e3f5f91feb529b1de71b Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Thu, 27 Mar 2025 12:57:19 -0400 Subject: [PATCH 14/18] moved fulfillment --- crates/bevy_ecs/src/entity/mod.rs | 87 ++++++++++++------------------- 1 file changed, 33 insertions(+), 54 deletions(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 0a5cdf97a3c4b..4230cf1aea781 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -541,59 +541,6 @@ pub enum RemoteReservationError { } impl RemoteEntitiesInner { - #[inline] - fn fulfill( - entities: &mut Entities, - mut init_allocated: impl FnMut(Entity, &mut EntityLocation), - in_channel: u32, - ) { - let to_fulfill = entities.remote.recent_requests.swap(0, Ordering::Relaxed); - let current_in_channel = entities.remote.in_channel.load(Ordering::Relaxed); - let should_reserve = (to_fulfill + in_channel).saturating_sub(current_in_channel); // should_reserve = to_fulfill + (in_channel - current_in_channel) - let new_in_channel = (current_in_channel + should_reserve).saturating_sub(to_fulfill); // new_in_channel = current_in_channel + (should_reserve - to_fulfill). - entities - .remote - .in_channel - .store(new_in_channel, Ordering::Relaxed); - - for _ in 0..should_reserve { - let entity = entities.alloc(); - // SAFETY: we just allocated it - let loc = unsafe { - &mut entities - .meta - .get_unchecked_mut(entity.index() as usize) - .location - }; - init_allocated(entity, loc); - let result = entities.remote.reserver.try_send(entity); - // It should not be closed and it can't get full. - debug_assert!(result.is_ok()); - } - } - - #[inline] - fn try_fulfill( - entities: &mut Entities, - init_allocated: impl FnMut(Entity, &mut EntityLocation), - in_channel: u32, - ) { - // we do this to hint to the compiler that the if branch is unlinkely to be taken. - #[cold] - fn do_fulfill( - entities: &mut Entities, - init_allocated: impl FnMut(Entity, &mut EntityLocation), - in_channel: u32, - ) { - RemoteEntitiesInner::fulfill(entities, init_allocated, in_channel); - } - - if entities.remote.recent_requests.load(Ordering::Relaxed) > 0 { - // TODO: add core::intrinsics::unlikely once stable - do_fulfill(entities, init_allocated, in_channel); - } - } - fn new() -> Self { let (sender, receiver) = async_channel::unbounded(); Self { @@ -1094,7 +1041,39 @@ impl Entities { ); } - RemoteEntitiesInner::try_fulfill(self, init, self.entities_hot_for_remote); + if self.remote.recent_requests.load(Ordering::Relaxed) > 0 { + // TODO: add core::intrinsics::unlikely once stable + self.force_remote_fulfill(init); + } + } + + // we do this to hint to the compiler that the if branch is unlinkely to be taken. + #[cold] + fn force_remote_fulfill(&mut self, init_allocated: impl FnMut(Entity, &mut EntityLocation)) { + let in_channel = self.entities_hot_for_remote; + let mut init_allocated = init_allocated; + let to_fulfill = self.remote.recent_requests.swap(0, Ordering::Relaxed); + let current_in_channel = self.remote.in_channel.load(Ordering::Relaxed); + let should_reserve = (to_fulfill + in_channel).saturating_sub(current_in_channel); // should_reserve = to_fulfill + (in_channel - current_in_channel) + let new_in_channel = (current_in_channel + should_reserve).saturating_sub(to_fulfill); // new_in_channel = current_in_channel + (should_reserve - to_fulfill). + self.remote + .in_channel + .store(new_in_channel, Ordering::Relaxed); + + for _ in 0..should_reserve { + let entity = self.alloc(); + // SAFETY: we just allocated it + let loc = unsafe { + &mut self + .meta + .get_unchecked_mut(entity.index() as usize) + .location + }; + init_allocated(entity, loc); + let result = self.remote.reserver.try_send(entity); + // It should not be closed and it can't get full. + debug_assert!(result.is_ok()); + } } /// Flushes all reserved entities to an "invalid" state. Attempting to retrieve them will return `None` From 2082cbf5c5890785401a66a95d1308b2e8494b94 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Thu, 27 Mar 2025 13:13:23 -0400 Subject: [PATCH 15/18] fix doc --- crates/bevy_ecs/src/entity/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 4230cf1aea781..a7af41516c24e 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -521,7 +521,7 @@ unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {} /// This is the shared data behind [`RemoteEntities`]. #[derive(Debug)] struct RemoteEntitiesInner { - /// The number of requests that have been made since the last [`fulfill`](Self::fulfill). + /// The number of requests that have been made since the last [`force_remote_fulfill`](Entities::force_remote_fulfill). recent_requests: AtomicU32, /// The number of entities we're trying to keep in the channel for low latency access. in_channel: AtomicU32, From cf65840a5f3b771b2bd5311969ab7a1227fa7a4e Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Thu, 27 Mar 2025 13:14:32 -0400 Subject: [PATCH 16/18] removed unneeded saturating sub --- crates/bevy_ecs/src/entity/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index a7af41516c24e..55156bc20cf3d 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -1055,7 +1055,7 @@ impl Entities { let to_fulfill = self.remote.recent_requests.swap(0, Ordering::Relaxed); let current_in_channel = self.remote.in_channel.load(Ordering::Relaxed); let should_reserve = (to_fulfill + in_channel).saturating_sub(current_in_channel); // should_reserve = to_fulfill + (in_channel - current_in_channel) - let new_in_channel = (current_in_channel + should_reserve).saturating_sub(to_fulfill); // new_in_channel = current_in_channel + (should_reserve - to_fulfill). + let new_in_channel = current_in_channel + should_reserve - to_fulfill; // new_in_channel = current_in_channel + (should_reserve - to_fulfill). self.remote .in_channel .store(new_in_channel, Ordering::Relaxed); From eb4d48bbdd4c16a40e97d56fe2de20900224f745 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Fri, 28 Mar 2025 09:27:54 -0400 Subject: [PATCH 17/18] small rename --- crates/bevy_ecs/src/entity/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index 55156bc20cf3d..f8e6bd68180f5 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -1426,7 +1426,7 @@ mod tests { #[test] #[cfg(feature = "std")] - fn remote_reservation_volitol_hot() { + fn remote_reservation_frequently_changed_hot() { let mut entities = Entities::new(); entities.entities_hot_for_remote = 16; From 3f85f938a6a0cc3c97c125da79bdef2f8d480729 Mon Sep 17 00:00:00 2001 From: Elliott Pierce Date: Fri, 28 Mar 2025 09:28:25 -0400 Subject: [PATCH 18/18] add reservation --- crates/bevy_ecs/src/entity/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/bevy_ecs/src/entity/mod.rs b/crates/bevy_ecs/src/entity/mod.rs index f8e6bd68180f5..c59e5783dfef2 100644 --- a/crates/bevy_ecs/src/entity/mod.rs +++ b/crates/bevy_ecs/src/entity/mod.rs @@ -1060,6 +1060,7 @@ impl Entities { .in_channel .store(new_in_channel, Ordering::Relaxed); + self.reserve(should_reserve); for _ in 0..should_reserve { let entity = self.alloc(); // SAFETY: we just allocated it