diff --git a/Cargo.lock b/Cargo.lock index 7e9a34147c..ac1c2a820a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,6 +155,20 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "async-executor" +version = "1.13.2" +source = "git+https://github.com/hermit-os/async-executor.git?branch=no_std#d703026ada41be60c55eb79d0b29a1fbf18b7a0b" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "lock_api", + "pin-project-lite", + "slab", +] + [[package]] name = "async-lock" version = "3.4.1" @@ -166,6 +180,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.88" @@ -541,6 +561,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5b9e9908e50b47ebbc3d6fd66ed295b997c270e8d2312a035bcc62722a160ef" +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "fdt" version = "0.1.5" @@ -604,6 +630,31 @@ dependencies = [ "num_enum", ] +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-lite" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -703,6 +754,7 @@ dependencies = [ "anstyle", "anyhow", "arm-gic", + "async-executor", "async-lock", "async-trait", "bit_field", @@ -1075,6 +1127,12 @@ dependencies = [ "ureq", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "paste" version = "1.0.15" @@ -1468,6 +1526,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +[[package]] +name = "slab" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" + [[package]] name = "smallvec" version = "1.15.1" diff --git a/Cargo.toml b/Cargo.toml index 712f737b8b..842e3e3bc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ virtio = { package = "virtio-spec", version = "0.3", optional = true, features = ahash = { version = "0.8", default-features = false } align-address = "0.3" anstyle = { version = "1", default-features = false } +async-executor = { git = "https://github.com/hermit-os/async-executor.git", branch = "no_std", default-features = false, features = ["static"] } async-lock = { version = "3.4.1", default-features = false } async-trait = "0.1.86" bit_field = "0.10" diff --git a/src/arch/aarch64/kernel/core_local.rs b/src/arch/aarch64/kernel/core_local.rs index aa62a72001..05ab120fc9 100644 --- a/src/arch/aarch64/kernel/core_local.rs +++ b/src/arch/aarch64/kernel/core_local.rs @@ -1,16 +1,16 @@ use alloc::boxed::Box; -use alloc::collections::vec_deque::VecDeque; use core::arch::asm; -use core::cell::{Cell, RefCell, RefMut}; +use core::cell::Cell; use core::ptr; use core::sync::atomic::Ordering; +use async_executor::StaticExecutor; #[cfg(feature = "smp")] use hermit_sync::InterruptTicketMutex; +use hermit_sync::{RawRwSpinLock, RawSpinMutex}; use super::CPU_ONLINE; use super::interrupts::{IRQ_COUNTERS, IrqStatistics}; -use crate::executor::task::AsyncTask; #[cfg(feature = "smp")] use crate::scheduler::SchedulerInput; use crate::scheduler::{CoreId, PerCoreScheduler}; @@ -23,8 +23,8 @@ pub(crate) struct CoreLocal { scheduler: Cell<*mut PerCoreScheduler>, /// Interface to the interrupt counters irq_statistics: &'static IrqStatistics, - /// Queue of async tasks - async_tasks: RefCell>, + /// The core-local async executor. + ex: StaticExecutor, /// Queues to handle incoming requests from the other cores #[cfg(feature = "smp")] pub scheduler_input: InterruptTicketMutex, @@ -46,7 +46,7 @@ impl CoreLocal { core_id, scheduler: Cell::new(ptr::null_mut()), irq_statistics, - async_tasks: RefCell::new(VecDeque::new()), + ex: StaticExecutor::new(), #[cfg(feature = "smp")] scheduler_input: InterruptTicketMutex::new(SchedulerInput::new()), }; @@ -96,8 +96,8 @@ pub(crate) fn core_scheduler() -> &'static mut PerCoreScheduler { unsafe { CoreLocal::get().scheduler.get().as_mut().unwrap() } } -pub(crate) fn async_tasks() -> RefMut<'static, VecDeque> { - CoreLocal::get().async_tasks.borrow_mut() +pub(crate) fn ex() -> &'static StaticExecutor { + &CoreLocal::get().ex } pub(crate) fn set_core_scheduler(scheduler: *mut PerCoreScheduler) { diff --git a/src/arch/riscv64/kernel/core_local.rs b/src/arch/riscv64/kernel/core_local.rs index 3e234c6de5..4cbddbaf5d 100644 --- a/src/arch/riscv64/kernel/core_local.rs +++ b/src/arch/riscv64/kernel/core_local.rs @@ -1,15 +1,15 @@ use alloc::boxed::Box; -use alloc::collections::vec_deque::VecDeque; use core::arch::asm; -use core::cell::{Cell, RefCell, RefMut}; +use core::cell::Cell; use core::ptr; use core::sync::atomic::Ordering; +use async_executor::StaticExecutor; #[cfg(feature = "smp")] use hermit_sync::InterruptTicketMutex; +use hermit_sync::{RawRwSpinLock, RawSpinMutex}; use crate::arch::riscv64::kernel::CPU_ONLINE; -use crate::executor::task::AsyncTask; #[cfg(feature = "smp")] use crate::scheduler::SchedulerInput; use crate::scheduler::{CoreId, PerCoreScheduler}; @@ -21,8 +21,8 @@ pub struct CoreLocal { scheduler: Cell<*mut PerCoreScheduler>, /// start address of the kernel stack pub kernel_stack: Cell, - /// Queue of async tasks - async_tasks: RefCell>, + /// The core-local async executor. + ex: StaticExecutor, /// Queues to handle incoming requests from the other cores #[cfg(feature = "smp")] pub scheduler_input: InterruptTicketMutex, @@ -41,7 +41,7 @@ impl CoreLocal { core_id, scheduler: Cell::new(ptr::null_mut()), kernel_stack: Cell::new(0), - async_tasks: RefCell::new(VecDeque::new()), + ex: StaticExecutor::new(), #[cfg(feature = "smp")] scheduler_input: InterruptTicketMutex::new(SchedulerInput::new()), }; @@ -84,6 +84,6 @@ pub fn set_core_scheduler(scheduler: *mut PerCoreScheduler) { CoreLocal::get().scheduler.set(scheduler); } -pub(crate) fn async_tasks() -> RefMut<'static, VecDeque> { - CoreLocal::get().async_tasks.borrow_mut() +pub(crate) fn ex() -> &'static StaticExecutor { + &CoreLocal::get().ex } diff --git a/src/arch/x86_64/kernel/core_local.rs b/src/arch/x86_64/kernel/core_local.rs index 0fa5212917..1aa29965e8 100644 --- a/src/arch/x86_64/kernel/core_local.rs +++ b/src/arch/x86_64/kernel/core_local.rs @@ -1,21 +1,21 @@ use alloc::boxed::Box; -use alloc::collections::vec_deque::VecDeque; use core::arch::asm; -use core::cell::{Cell, RefCell, RefMut}; +use core::cell::Cell; #[cfg(feature = "smp")] use core::sync::atomic::AtomicBool; use core::sync::atomic::Ordering; use core::{mem, ptr}; +use async_executor::StaticExecutor; #[cfg(feature = "smp")] use hermit_sync::InterruptTicketMutex; +use hermit_sync::{RawRwSpinLock, RawSpinMutex}; use x86_64::VirtAddr; use x86_64::registers::model_specific::GsBase; use x86_64::structures::tss::TaskStateSegment; use super::CPU_ONLINE; use super::interrupts::{IRQ_COUNTERS, IrqStatistics}; -use crate::executor::task::AsyncTask; #[cfg(feature = "smp")] use crate::scheduler::SchedulerInput; use crate::scheduler::{CoreId, PerCoreScheduler}; @@ -32,8 +32,8 @@ pub(crate) struct CoreLocal { pub kernel_stack: Cell<*mut u8>, /// Interface to the interrupt counters irq_statistics: &'static IrqStatistics, - /// Queue of async tasks - async_tasks: RefCell>, + /// The core-local async executor. + ex: StaticExecutor, #[cfg(feature = "smp")] pub hlt: AtomicBool, /// Queues to handle incoming requests from the other cores @@ -61,7 +61,7 @@ impl CoreLocal { tss: Cell::new(ptr::null_mut()), kernel_stack: Cell::new(ptr::null_mut()), irq_statistics, - async_tasks: RefCell::new(VecDeque::new()), + ex: StaticExecutor::new(), #[cfg(feature = "smp")] hlt: AtomicBool::new(false), #[cfg(feature = "smp")] @@ -110,8 +110,8 @@ pub(crate) fn core_scheduler() -> &'static mut PerCoreScheduler { unsafe { CoreLocal::get().scheduler.get().as_mut().unwrap() } } -pub(crate) fn async_tasks() -> RefMut<'static, VecDeque> { - CoreLocal::get().async_tasks.borrow_mut() +pub(crate) fn ex() -> &'static StaticExecutor { + &CoreLocal::get().ex } pub(crate) fn set_core_scheduler(scheduler: *mut PerCoreScheduler) { diff --git a/src/executor/mod.rs b/src/executor/mod.rs index af65c86bc0..04d4a73b59 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -19,7 +19,7 @@ use hermit_sync::without_interrupts; #[cfg(any(feature = "tcp", feature = "udp"))] use smoltcp::time::Instant; -use crate::arch::core_local::*; +use crate::arch::core_local; use crate::errno::Errno; use crate::executor::task::AsyncTask; use crate::io; @@ -94,16 +94,13 @@ impl Wake for TaskNotify { } pub(crate) fn run() { - let mut cx = Context::from_waker(Waker::noop()); - without_interrupts(|| { - // FIXME(mkroening): Not all tasks register wakers and never sleep - for _ in 0..({ async_tasks().len() }) { - let mut task = { async_tasks().pop_front().unwrap() }; - trace!("Run async task {}", task.id()); - - if task.poll(&mut cx).is_pending() { - async_tasks().push_back(task); + // FIXME: We currently have no more than 3 tasks at a time, so this is fine. + // Ideally, we would set this value to 200, but the network task currently immediately wakes up again. + // This would lead to the network task being polled 200 times back to back, slowing things down considerably. + for _ in 0..3 { + if !core_local::ex().try_tick() { + break; } } }); @@ -118,7 +115,7 @@ pub(crate) fn spawn(future: F) where F: Future + Send + 'static, { - without_interrupts(|| async_tasks().push_back(AsyncTask::new(future))); + core_local::ex().spawn(AsyncTask::new(future)).detach(); } pub fn init() { @@ -177,7 +174,7 @@ where } else { None }; - core_scheduler().add_network_timer( + core_local::core_scheduler().add_network_timer( delay.map(|d| crate::arch::processor::get_timer_ticks() + d), ); } @@ -203,7 +200,7 @@ where } else { None }; - core_scheduler().add_network_timer( + core_local::core_scheduler().add_network_timer( delay.map(|d| crate::arch::processor::get_timer_ticks() + d), ); } @@ -230,7 +227,7 @@ where }; if delay.unwrap_or(10_000_000) > 10_000 { - core_scheduler().add_network_timer( + core_local::core_scheduler().add_network_timer( delay.map(|d| crate::arch::processor::get_timer_ticks() + d), ); let wakeup_time = diff --git a/src/executor/task.rs b/src/executor/task.rs index b863cfcdc9..3a537d1dcd 100644 --- a/src/executor/task.rs +++ b/src/executor/task.rs @@ -8,7 +8,7 @@ use core::sync::atomic::{AtomicU32, Ordering}; use core::task::{Context, Poll}; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -pub(crate) struct AsyncTaskId(u32); +struct AsyncTaskId(u32); impl fmt::Display for AsyncTaskId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -25,22 +25,23 @@ impl AsyncTaskId { pub(crate) struct AsyncTask { id: AsyncTaskId, - future: Pin>>, + future: Pin + Send>>, } impl AsyncTask { - pub fn new(future: impl Future + 'static) -> AsyncTask { + pub fn new(future: impl Future + Send + 'static) -> AsyncTask { AsyncTask { id: AsyncTaskId::new(), future: Box::pin(future), } } +} - pub fn id(&self) -> AsyncTaskId { - self.id - } +impl Future for AsyncTask { + type Output = (); - pub fn poll(&mut self, context: &mut Context<'_>) -> Poll<()> { - self.future.as_mut().poll(context) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + trace!("Run async task {}", self.id); + self.as_mut().future.as_mut().poll(cx) } }