Skip to content

Commit c10412c

Browse files
authored
Pin an Executor's State to minimize atomic operations. (#146)
Use `Pin<&'a State>` to point to the state instead of cloning `Arc`s, this should reduce the amount of atomic operations required. This should be sound given that the state are never moved until the Executor is dropped. This still uses `Arc` instead of `Box` as miri will treat a former Box as a uniquely owned pointer and cannot be shared (backed by `Unique` vs `Shared`).
1 parent 1e8d245 commit c10412c

File tree

3 files changed

+63
-57
lines changed

3 files changed

+63
-57
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ name = "async-executor"
66
version = "1.13.3"
77
authors = ["Stjepan Glavina <[email protected]>", "John Nunley <[email protected]>"]
88
edition = "2021"
9-
rust-version = "1.63"
9+
rust-version = "1.65"
1010
description = "Async executor"
1111
license = "Apache-2.0 OR MIT"
1212
repository = "https://github.com/smol-rs/async-executor"

src/lib.rs

Lines changed: 41 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub use static_executors::*;
9090
/// ```
9191
pub struct Executor<'a> {
9292
/// The executor state.
93-
state: AtomicPtr<State>,
93+
pub(crate) state: AtomicPtr<State>,
9494

9595
/// Makes the `'a` lifetime invariant.
9696
_marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
@@ -163,10 +163,11 @@ impl<'a> Executor<'a> {
163163
/// });
164164
/// ```
165165
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
166-
let mut active = self.state().active();
166+
let state = self.state();
167+
let mut active = state.active();
167168

168169
// SAFETY: `T` and the future are `Send`.
169-
unsafe { self.spawn_inner(future, &mut active) }
170+
unsafe { Self::spawn_inner(state, future, &mut active) }
170171
}
171172

172173
/// Spawns many tasks onto the executor.
@@ -214,12 +215,13 @@ impl<'a> Executor<'a> {
214215
futures: impl IntoIterator<Item = F>,
215216
handles: &mut impl Extend<Task<F::Output>>,
216217
) {
217-
let mut active = Some(self.state().active());
218+
let state = self.state();
219+
let mut active = Some(state.as_ref().active());
218220

219221
// Convert the futures into tasks.
220222
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
221223
// SAFETY: `T` and the future are `Send`.
222-
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
224+
let task = unsafe { Self::spawn_inner(state, future, active.as_mut().unwrap()) };
223225

224226
// Yield the lock every once in a while to ease contention.
225227
if i.wrapping_sub(1) % 500 == 0 {
@@ -240,14 +242,13 @@ impl<'a> Executor<'a> {
240242
///
241243
/// If this is an `Executor`, `F` and `T` must be `Send`.
242244
unsafe fn spawn_inner<T: 'a>(
243-
&self,
245+
state: Pin<&'a State>,
244246
future: impl Future<Output = T> + 'a,
245247
active: &mut Slab<Waker>,
246248
) -> Task<T> {
247249
// Remove the task from the set of active tasks when the future finishes.
248250
let entry = active.vacant_entry();
249251
let index = entry.key();
250-
let state = self.state_as_arc();
251252
let future = AsyncCallOnDrop::new(future, move || drop(state.active().try_remove(index)));
252253

253254
// Create the task and register it in the set of active tasks.
@@ -269,12 +270,16 @@ impl<'a> Executor<'a> {
269270
// the `Executor` is drained of all of its runnables. This ensures that
270271
// runnables are dropped and this precondition is satisfied.
271272
//
272-
// `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
273-
// Therefore we do not need to worry about what is done with the
274-
// `Waker`.
273+
// `Self::schedule` is `Send` and `Sync`, as checked below.
274+
// Therefore we do not need to worry about which thread the `Waker` is used
275+
// and dropped on.
276+
//
277+
// `Self::schedule` may not be `'static`, but we make sure that the `Waker` does
278+
// not outlive `'a`. When the executor is dropped, the `active` field is
279+
// drained and all of the `Waker`s are woken.
275280
let (runnable, task) = Builder::new()
276281
.propagate_panic(true)
277-
.spawn_unchecked(|()| future, self.schedule());
282+
.spawn_unchecked(|()| future, Self::schedule(state));
278283
entry.insert(runnable.waker());
279284

280285
runnable.schedule();
@@ -345,9 +350,7 @@ impl<'a> Executor<'a> {
345350
}
346351

347352
/// Returns a function that schedules a runnable task when it gets woken up.
348-
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
349-
let state = self.state_as_arc();
350-
353+
fn schedule(state: Pin<&'a State>) -> impl Fn(Runnable) + Send + Sync + 'a {
351354
// TODO: If possible, push into the current local queue and notify the ticker.
352355
move |runnable| {
353356
let result = state.queue.push(runnable);
@@ -358,12 +361,11 @@ impl<'a> Executor<'a> {
358361

359362
/// Returns a pointer to the inner state.
360363
#[inline]
361-
fn state_ptr(&self) -> *const State {
364+
fn state(&self) -> Pin<&'a State> {
362365
#[cold]
363366
fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
364367
let state = Arc::new(State::new());
365-
// TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
366-
let ptr = Arc::into_raw(state) as *mut State;
368+
let ptr = Arc::into_raw(state).cast_mut();
367369
if let Err(actual) = atomic_ptr.compare_exchange(
368370
std::ptr::null_mut(),
369371
ptr,
@@ -382,26 +384,10 @@ impl<'a> Executor<'a> {
382384
if ptr.is_null() {
383385
ptr = alloc_state(&self.state);
384386
}
385-
ptr
386-
}
387387

388-
/// Returns a reference to the inner state.
389-
#[inline]
390-
fn state(&self) -> &State {
391388
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
392-
// when accessed through state_ptr.
393-
unsafe { &*self.state_ptr() }
394-
}
395-
396-
// Clones the inner state Arc
397-
#[inline]
398-
fn state_as_arc(&self) -> Arc<State> {
399-
// SAFETY: So long as an Executor lives, it's state pointer will always be a valid
400-
// Arc when accessed through state_ptr.
401-
let arc = unsafe { Arc::from_raw(self.state_ptr()) };
402-
let clone = arc.clone();
403-
std::mem::forget(arc);
404-
clone
389+
// and will never be moved until it's dropped.
390+
Pin::new(unsafe { &*ptr })
405391
}
406392
}
407393

@@ -416,7 +402,7 @@ impl Drop for Executor<'_> {
416402
// via Arc::into_raw in state_ptr.
417403
let state = unsafe { Arc::from_raw(ptr) };
418404

419-
let mut active = state.active();
405+
let mut active = state.pin().active();
420406
for w in active.drain() {
421407
w.wake();
422408
}
@@ -518,11 +504,12 @@ impl<'a> LocalExecutor<'a> {
518504
/// });
519505
/// ```
520506
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
521-
let mut active = self.inner().state().active();
507+
let state = self.inner().state();
508+
let mut active = state.active();
522509

523510
// SAFETY: This executor is not thread safe, so the future and its result
524511
// cannot be sent to another thread.
525-
unsafe { self.inner().spawn_inner(future, &mut active) }
512+
unsafe { Executor::spawn_inner(state, future, &mut active) }
526513
}
527514

528515
/// Spawns many tasks onto the executor.
@@ -570,13 +557,14 @@ impl<'a> LocalExecutor<'a> {
570557
futures: impl IntoIterator<Item = F>,
571558
handles: &mut impl Extend<Task<F::Output>>,
572559
) {
573-
let mut active = self.inner().state().active();
560+
let state = self.inner().state();
561+
let mut active = state.active();
574562

575563
// Convert all of the futures to tasks.
576564
let tasks = futures.into_iter().map(|future| {
577565
// SAFETY: This executor is not thread safe, so the future and its result
578566
// cannot be sent to another thread.
579-
unsafe { self.inner().spawn_inner(future, &mut active) }
567+
unsafe { Executor::spawn_inner(state, future, &mut active) }
580568

581569
// As only one thread can spawn or poll tasks at a time, there is no need
582570
// to release lock contention here.
@@ -695,9 +683,16 @@ impl State {
695683
}
696684
}
697685

686+
fn pin(&self) -> Pin<&Self> {
687+
Pin::new(self)
688+
}
689+
698690
/// Returns a reference to currently active tasks.
699-
fn active(&self) -> MutexGuard<'_, Slab<Waker>> {
700-
self.active.lock().unwrap_or_else(PoisonError::into_inner)
691+
fn active(self: Pin<&Self>) -> MutexGuard<'_, Slab<Waker>> {
692+
self.get_ref()
693+
.active
694+
.lock()
695+
.unwrap_or_else(PoisonError::into_inner)
701696
}
702697

703698
/// Notifies a sleeping ticker.
@@ -1209,13 +1204,14 @@ fn _ensure_send_and_sync() {
12091204
is_sync::<Executor<'_>>(Executor::new());
12101205

12111206
let ex = Executor::new();
1207+
let state = ex.state();
12121208
is_send(ex.run(pending::<()>()));
12131209
is_sync(ex.run(pending::<()>()));
12141210
is_send(ex.tick());
12151211
is_sync(ex.tick());
1216-
is_send(ex.schedule());
1217-
is_sync(ex.schedule());
1218-
is_static(ex.schedule());
1212+
is_send(Executor::schedule(state));
1213+
is_sync(Executor::schedule(state));
1214+
is_static(Executor::schedule(state));
12191215

12201216
/// ```compile_fail
12211217
/// use async_executor::LocalExecutor;

src/static_executors.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
future::Future,
88
marker::PhantomData,
99
panic::{RefUnwindSafe, UnwindSafe},
10-
sync::PoisonError,
10+
sync::{atomic::Ordering, PoisonError},
1111
};
1212

1313
impl Executor<'static> {
@@ -35,11 +35,16 @@ impl Executor<'static> {
3535
/// future::block_on(ex.run(task));
3636
/// ```
3737
pub fn leak(self) -> &'static StaticExecutor {
38-
let ptr = self.state_ptr();
39-
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
40-
// when accessed through state_ptr. This executor will live for the full 'static
41-
// lifetime so this isn't an arbitrary lifetime extension.
42-
let state: &'static State = unsafe { &*ptr };
38+
let ptr = self.state.load(Ordering::Relaxed);
39+
40+
let state: &'static State = if ptr.is_null() {
41+
Box::leak(Box::new(State::new()))
42+
} else {
43+
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
44+
// when accessed through state_ptr. This executor will live for the full 'static
45+
// lifetime so this isn't an arbitrary lifetime extension.
46+
unsafe { &*ptr }
47+
};
4348

4449
std::mem::forget(self);
4550

@@ -85,11 +90,16 @@ impl LocalExecutor<'static> {
8590
/// future::block_on(ex.run(task));
8691
/// ```
8792
pub fn leak(self) -> &'static StaticLocalExecutor {
88-
let ptr = self.inner.state_ptr();
89-
// SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
90-
// when accessed through state_ptr. This executor will live for the full 'static
91-
// lifetime so this isn't an arbitrary lifetime extension.
92-
let state: &'static State = unsafe { &*ptr };
93+
let ptr = self.inner.state.load(Ordering::Relaxed);
94+
95+
let state: &'static State = if ptr.is_null() {
96+
Box::leak(Box::new(State::new()))
97+
} else {
98+
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
99+
// when accessed through state_ptr. This executor will live for the full 'static
100+
// lifetime so this isn't an arbitrary lifetime extension.
101+
unsafe { &*ptr }
102+
};
93103

94104
std::mem::forget(self);
95105

0 commit comments

Comments
 (0)