Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bevy_reflect = ["dep:bevy_reflect"]
reflect_functions = ["bevy_reflect", "bevy_reflect/functions"]

## Use the configurable global error handler as the default error handler.
##
##
## This is typically used to turn panics from the ECS into loggable errors.
## This may be useful for production builds,
## but can result in a measurable performance impact, especially for commands.
Expand Down Expand Up @@ -85,6 +85,7 @@ std = [
"arrayvec?/std",
"log/std",
"bevy_platform_support/std",
"async-channel/std",
]

## `critical-section` provides the building blocks for synchronization primitives
Expand Down Expand Up @@ -132,12 +133,18 @@ variadics_please = { version = "1.1", default-features = false }
tracing = { version = "0.1", default-features = false, optional = true }
log = { version = "0.4", default-features = false }
bumpalo = "3"
async-channel = { version = "2.3", default-features = false }

[target.'cfg(not(all(target_has_atomic = "8", target_has_atomic = "16", target_has_atomic = "32", target_has_atomic = "64", target_has_atomic = "ptr")))'.dependencies]
concurrent-queue = { version = "2.5.0", default-features = false, features = [
"portable-atomic",
] }

# This is an unused dependency, but it adds the feature to a dependency of async-channel for no_std support
event-listener = { version = "5", default-features = false, features = [
"portable-atomic",
] }

[dev-dependencies]
rand = "0.8"
static_assertions = "1.1.0"
Expand Down
224 changes: 222 additions & 2 deletions crates/bevy_ecs/src/entity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ use crate::{
storage::{SparseSetIndex, TableId, TableRow},
};
use alloc::vec::Vec;
use bevy_platform_support::sync::atomic::Ordering;
use bevy_platform_support::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use core::{fmt, hash::Hash, mem, num::NonZero, panic::Location};
use log::warn;

Expand Down Expand Up @@ -515,6 +518,136 @@ impl<'a> core::iter::FusedIterator for ReserveEntitiesIterator<'a> {}
// SAFETY: Newly reserved entity values are unique.
unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {}

#[derive(Debug)]
struct RemoteEntitiesInner {
recent_requests: AtomicU32,
keep_hot: AtomicU32,
reserved: async_channel::Receiver<Entity>,
reserver: async_channel::Sender<Entity>,
}

/// An error that occurs when an [`Entity`] can not be reserved remotely.
/// See also [`RemoteEntities`].
#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
pub enum RemoteReservationError {
/// This happens when [`Entities`] are closed, dropped, etc while a [`RemoteEntities`] is trying to reserve from it.
#[error("A remote entity reserver tried to reserve an entity from a closed `Entities`.")]
Closed,
}

impl RemoteEntitiesInner {
#[inline]
fn fulfill(
entities: &mut Entities,
mut reserve_allocated: impl FnMut(Entity, &mut EntityLocation),
keep_hot: u32,
) {
let to_fulfill = entities.remote.recent_requests.swap(0, Ordering::Relaxed);
let current_hot = entities.remote.keep_hot.load(Ordering::Relaxed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why entities.remote.keep_hot needs to be an Atomic. Could we just move it into Entities and then handle it with just regular u32s?

Similarly is keep_hot the correct name for this? Maybe we should call it like in_channel or something? (they may have already been claimed by recent_request but I think it's close enough?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why entities.remote.keep_hot needs to be an Atomic. Could we just move it into Entities and then handle it with just regular u32s?

For now, it doesn't. However, the extra 2 atomic ops are nothing compared to the like 10 atomic ops per entity to push onto the queue. (This is something I think a custom queue implementation can do much better. I did this for v4).

Given that it's not a performance concern, I like to keep the state about remote entities on the type itself rather than somewhere on Entities. We can change this later, but this could still come in handy if we want to do fulfillment with only &Entities in the future (maybe).

Similarly is keep_hot the correct name for this? Maybe we should call it like in_channel or something? (they may have already been claimed by recent_request but I think it's close enough?)

That's a more precise name, so I changed the internals to use that. But the field we expose to users I've kept as "hot" just because the channel detail might change later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, it doesn't. However, the extra 2 atomic ops are nothing compared to the like 10 atomic ops per entity to push onto the queue. (This is something I think a custom queue implementation can do much better. I did this for v4).

Given that it's not a performance concern, I like to keep the state about remote entities on the type itself rather than somewhere on Entities. We can change this later, but this could still come in handy if we want to do fulfillment with only &Entities in the future (maybe).

Another way to approach this is to have separate structs for each side, like Sender and Receiver on channels. That keeps the remote reservation logic together, but still keeps the provider and client data separate. That would also remove the need to close() the channel explicitly, since it automatically closes when the last Sender is dropped.

I think the issue with using an atomic here is that the ordinary load followed by a store looks like a race condition, while if you had &mut then it would be obvious that nothing could change it in between. Performance shouldn't be an issue; relaxed load and store calls like this are no more expensive than non-atomic ones.

let should_reserve = (to_fulfill + keep_hot).saturating_sub(current_hot); // should_reserve = to_fulfill + (keep_hot - cuurent_hot)
let new_hot = (current_hot + should_reserve).saturating_sub(to_fulfill); // new_hot = current_hot + (should_reserve - to_fulfill).
entities.remote.keep_hot.store(new_hot, Ordering::Relaxed);

for _ in 0..should_reserve {
let entity = entities.alloc();
// SAFETY: we just allocated it
let loc = unsafe {
&mut entities
.meta
.get_unchecked_mut(entity.index() as usize)
.location
};
reserve_allocated(entity, loc);
let result = entities.remote.reserver.try_send(entity);
// It should not be closed and it can't get full.
debug_assert!(result.is_ok());
}
}

#[inline]
fn try_fulfill(
entities: &mut Entities,
reserve_allocated: impl FnMut(Entity, &mut EntityLocation),
keep_hot: u32,
) {
// we do this to hint to the compiler that the if branch is unlinkely to be taken.
#[cold]
fn do_fulfill(
entities: &mut Entities,
reserve_allocated: impl FnMut(Entity, &mut EntityLocation),
keep_hot: u32,
) {
RemoteEntitiesInner::fulfill(entities, reserve_allocated, keep_hot);
}

if entities.remote.recent_requests.load(Ordering::Relaxed) > 0 {
// TODO: add core::intrinsics::unlikely once stable
do_fulfill(entities, reserve_allocated, keep_hot);
}
}

fn new() -> Self {
let (sender, receiver) = async_channel::unbounded();
Self {
recent_requests: AtomicU32::new(0),
reserver: sender,
reserved: receiver,
keep_hot: AtomicU32::new(0),
}
}

fn close(&self) {
self.reserved.close();
}
}

/// Manages access to [`Entities`] from any thread and async.
#[derive(Clone)]
pub struct RemoteEntities {
inner: Arc<RemoteEntitiesInner>,
}

impl RemoteEntities {
/// Reserves an [`Entity`] from async.
///
/// # Example
///
/// ```
/// use bevy_ecs::prelude::*;
///
/// let mut world = World::new();
/// let remote = world.entities().get_remote();
///
/// // The reserve is async so we need it to be on a separate thread.
/// let thread = std::thread::spawn(move || {
/// let future = async {
/// for _ in 0..100 {
/// remote.reserve_entity().await.unwrap();
/// }
/// };
/// bevy_tasks::block_on(future);
/// });
///
/// // We need to flush the entities as needed or the remote entities will get stuck.
/// while !thread.is_finished() {
/// world.flush();
/// }
/// ```
pub async fn reserve_entity(&self) -> Result<Entity, RemoteReservationError> {
self.inner.recent_requests.fetch_add(1, Ordering::Relaxed);
self.inner
.reserved
.recv()
.await
.map_err(|_| RemoteReservationError::Closed)
}

/// Returns true only if the [`Entities`] has discontinued this remote access.
pub fn is_closed(&self) -> bool {
self.inner.reserved.is_closed()
}
}

/// A [`World`]'s internal metadata store on all of its entities.
///
/// Contains metadata on:
Expand All @@ -525,6 +658,10 @@ unsafe impl EntitySetIterator for ReserveEntitiesIterator<'_> {}
/// [`World`]: crate::world::World
#[derive(Debug)]
pub struct Entities {
/// This is the number of entities we keep ready for remote reservations via [`RemoteEntities::reserve_entity`].
/// A value too high can cause excess memory to be used, but a value too low can cause additional waiting.
pub entities_hot_for_remote: u32,
remote: Arc<RemoteEntitiesInner>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could we just make this store a RemoteEntities? That way we don't have to worry about constructing one in get_remote and can just blindly clone. Also means if we change the details of RemoteEntities we don't have to worry about how we construct it each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do this, and if v6 doesn't end up evolving much, I'm on board with this. However, many of the ways we can improve this is by caching some atomic results in a non-atomic that needs &mut. More specifically, we'd need RemoteEntities to have additional per-instance state in addition to the shared state in Arc<RemoteEntitiesInner>. But Entities isn't a remote instance, so it doesn't make sense to include that per-instance state in Entities.

I used this a lot in v4. If v6 is merged, I'll start slowly moving concepts from v4 into it to try to improve performance. If none of those changes are merged, then I can follow up by simplifying this per your suggestion. It's preemptive future proofing if that makes sense.

meta: Vec<EntityMeta>,

/// The `pending` and `free_cursor` fields describe three sets of Entity IDs
Expand Down Expand Up @@ -572,11 +709,23 @@ pub struct Entities {
}

impl Entities {
pub(crate) const fn new() -> Self {
/// The default value of [`entities_hot_for_remote`](Self::entities_hot_for_remote).
pub const DEFAULT_HOT_REMOTE_ENTITIES: u32 = 256;

pub(crate) fn new() -> Self {
Entities {
meta: Vec::new(),
pending: Vec::new(),
free_cursor: AtomicIdCursor::new(0),
remote: Arc::new(RemoteEntitiesInner::new()),
entities_hot_for_remote: Self::DEFAULT_HOT_REMOTE_ENTITIES,
}
}

/// Constructs a new [`RemoteEntities`] for this instance.
pub fn get_remote(&self) -> RemoteEntities {
RemoteEntities {
inner: self.remote.clone(),
}
}

Expand Down Expand Up @@ -821,6 +970,8 @@ impl Entities {
self.meta.clear();
self.pending.clear();
*self.free_cursor.get_mut() = 0;
self.remote.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will disconnect any outstanding RemoteEntities values so that they start failing to reserve. Is that really what we want to do in this case? It might be less disruptive to simply drain the queue.

Although it probably doesn't matter, since nobody will call clear() on a real application.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer this behavior. What if a clear happens while an asset is loading? Now we have to make sure every s included entity is valid instead of just making sure one is. Right now, the minute it clears, all current remote reservers fail, which I think a more transparent way of error handling.

(But like you said, it doesn't really mater).

self.remote = Arc::new(RemoteEntitiesInner::new());
}

/// Returns the location of an [`Entity`].
Expand Down Expand Up @@ -934,6 +1085,8 @@ impl Entities {
&mut meta.location,
);
}

RemoteEntitiesInner::try_fulfill(self, init, self.entities_hot_for_remote);
}

/// Flushes all reserved entities to an "invalid" state. Attempting to retrieve them will return `None`
Expand Down Expand Up @@ -1035,6 +1188,13 @@ impl Entities {
}
}

impl Drop for Entities {
fn drop(&mut self) {
// Make sure remote entities are informed.
self.remote.close();
}
}

/// An error that occurs when a specified [`Entity`] does not exist.
#[derive(thiserror::Error, Debug, Clone, Copy, PartialEq, Eq)]
#[error("The entity with ID {entity} {details}")]
Expand Down Expand Up @@ -1217,6 +1377,66 @@ mod tests {
assert!(next_entity.generation() > entity.generation() + GENERATIONS);
}

#[cfg(feature = "std")]
fn test_remote_reservation(entities: &mut Entities) {
use bevy_tasks::block_on;
use rand::{rngs::StdRng, Rng, SeedableRng};
use std::thread;

let mut rng = StdRng::seed_from_u64(89274528);

let mut threads = (0..3)
.map(|_| {
let reserver = entities.get_remote();
thread::spawn(move || {
let future = async {
for _ in 0..100 {
reserver.reserve_entity().await.unwrap();
}
};
block_on(future);
})
})
.collect::<Vec<_>>();

let timeout = std::time::Instant::now();
loop {
threads.retain(|thread| !thread.is_finished());
entities.flush_as_invalid();
entities.entities_hot_for_remote = rng.r#gen::<u32>() & 127;
if threads.is_empty() {
break;
}
if timeout.elapsed().as_secs() > 60 {
panic!("remote entities timmed out.")
}
}

// It might be a little over since we may have reserved extra entities for remote reservation.
assert!(entities.len() >= 300);
assert_eq!(
entities.remote.keep_hot.load(Ordering::Relaxed),
entities.remote.reserved.len() as u32
);
}

#[test]
#[cfg(feature = "std")]
fn remote_reservation() {
let mut entities = Entities::new();
// Lower batch size so more waiting is tested.
entities.entities_hot_for_remote = 16;
test_remote_reservation(&mut entities);
// Lower batch size so more waiting is tested.
entities.entities_hot_for_remote = 1024;
test_remote_reservation(&mut entities);

// Try 0
let mut entities = Entities::new();
entities.entities_hot_for_remote = 0;
test_remote_reservation(&mut entities);
}

#[test]
#[expect(
clippy::nonminimal_bool,
Expand Down
Loading