Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ virtio = { package = "virtio-spec", version = "0.3", optional = true, features =
ahash = { version = "0.8", default-features = false }
align-address = "0.3"
anstyle = { version = "1", default-features = false }
async-executor = { git = "https://github.com/hermit-os/async-executor.git", branch = "no_std", default-features = false, features = ["static"] }
async-lock = { version = "3.4.1", default-features = false }
async-trait = "0.1.86"
bit_field = "0.10"
Expand Down
16 changes: 8 additions & 8 deletions src/arch/aarch64/kernel/core_local.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use alloc::boxed::Box;
use alloc::collections::vec_deque::VecDeque;
use core::arch::asm;
use core::cell::{Cell, RefCell, RefMut};
use core::cell::Cell;
use core::ptr;
use core::sync::atomic::Ordering;

use async_executor::StaticExecutor;
#[cfg(feature = "smp")]
use hermit_sync::InterruptTicketMutex;
use hermit_sync::{RawRwSpinLock, RawSpinMutex};

use super::CPU_ONLINE;
use super::interrupts::{IRQ_COUNTERS, IrqStatistics};
use crate::executor::task::AsyncTask;
#[cfg(feature = "smp")]
use crate::scheduler::SchedulerInput;
use crate::scheduler::{CoreId, PerCoreScheduler};
Expand All @@ -23,8 +23,8 @@ pub(crate) struct CoreLocal {
scheduler: Cell<*mut PerCoreScheduler>,
/// Interface to the interrupt counters
irq_statistics: &'static IrqStatistics,
/// Queue of async tasks
async_tasks: RefCell<VecDeque<AsyncTask>>,
/// The core-local async executor.
ex: StaticExecutor<RawSpinMutex, RawRwSpinLock>,
/// Queues to handle incoming requests from the other cores
#[cfg(feature = "smp")]
pub scheduler_input: InterruptTicketMutex<SchedulerInput>,
Expand All @@ -46,7 +46,7 @@ impl CoreLocal {
core_id,
scheduler: Cell::new(ptr::null_mut()),
irq_statistics,
async_tasks: RefCell::new(VecDeque::new()),
ex: StaticExecutor::new(),
#[cfg(feature = "smp")]
scheduler_input: InterruptTicketMutex::new(SchedulerInput::new()),
};
Expand Down Expand Up @@ -96,8 +96,8 @@ pub(crate) fn core_scheduler() -> &'static mut PerCoreScheduler {
unsafe { CoreLocal::get().scheduler.get().as_mut().unwrap() }
}

pub(crate) fn async_tasks() -> RefMut<'static, VecDeque<AsyncTask>> {
CoreLocal::get().async_tasks.borrow_mut()
pub(crate) fn ex() -> &'static StaticExecutor<RawSpinMutex, RawRwSpinLock> {
&CoreLocal::get().ex
}

pub(crate) fn set_core_scheduler(scheduler: *mut PerCoreScheduler) {
Expand Down
16 changes: 8 additions & 8 deletions src/arch/riscv64/kernel/core_local.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use alloc::boxed::Box;
use alloc::collections::vec_deque::VecDeque;
use core::arch::asm;
use core::cell::{Cell, RefCell, RefMut};
use core::cell::Cell;
use core::ptr;
use core::sync::atomic::Ordering;

use async_executor::StaticExecutor;
#[cfg(feature = "smp")]
use hermit_sync::InterruptTicketMutex;
use hermit_sync::{RawRwSpinLock, RawSpinMutex};

use crate::arch::riscv64::kernel::CPU_ONLINE;
use crate::executor::task::AsyncTask;
#[cfg(feature = "smp")]
use crate::scheduler::SchedulerInput;
use crate::scheduler::{CoreId, PerCoreScheduler};
Expand All @@ -21,8 +21,8 @@ pub struct CoreLocal {
scheduler: Cell<*mut PerCoreScheduler>,
/// start address of the kernel stack
pub kernel_stack: Cell<u64>,
/// Queue of async tasks
async_tasks: RefCell<VecDeque<AsyncTask>>,
/// The core-local async executor.
ex: StaticExecutor<RawSpinMutex, RawRwSpinLock>,
/// Queues to handle incoming requests from the other cores
#[cfg(feature = "smp")]
pub scheduler_input: InterruptTicketMutex<SchedulerInput>,
Expand All @@ -41,7 +41,7 @@ impl CoreLocal {
core_id,
scheduler: Cell::new(ptr::null_mut()),
kernel_stack: Cell::new(0),
async_tasks: RefCell::new(VecDeque::new()),
ex: StaticExecutor::new(),
#[cfg(feature = "smp")]
scheduler_input: InterruptTicketMutex::new(SchedulerInput::new()),
};
Expand Down Expand Up @@ -84,6 +84,6 @@ pub fn set_core_scheduler(scheduler: *mut PerCoreScheduler) {
CoreLocal::get().scheduler.set(scheduler);
}

pub(crate) fn async_tasks() -> RefMut<'static, VecDeque<AsyncTask>> {
CoreLocal::get().async_tasks.borrow_mut()
pub(crate) fn ex() -> &'static StaticExecutor<RawSpinMutex, RawRwSpinLock> {
&CoreLocal::get().ex
}
16 changes: 8 additions & 8 deletions src/arch/x86_64/kernel/core_local.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use alloc::boxed::Box;
use alloc::collections::vec_deque::VecDeque;
use core::arch::asm;
use core::cell::{Cell, RefCell, RefMut};
use core::cell::Cell;
#[cfg(feature = "smp")]
use core::sync::atomic::AtomicBool;
use core::sync::atomic::Ordering;
use core::{mem, ptr};

use async_executor::StaticExecutor;
#[cfg(feature = "smp")]
use hermit_sync::InterruptTicketMutex;
use hermit_sync::{RawRwSpinLock, RawSpinMutex};
use x86_64::VirtAddr;
use x86_64::registers::model_specific::GsBase;
use x86_64::structures::tss::TaskStateSegment;

use super::CPU_ONLINE;
use super::interrupts::{IRQ_COUNTERS, IrqStatistics};
use crate::executor::task::AsyncTask;
#[cfg(feature = "smp")]
use crate::scheduler::SchedulerInput;
use crate::scheduler::{CoreId, PerCoreScheduler};
Expand All @@ -32,8 +32,8 @@ pub(crate) struct CoreLocal {
pub kernel_stack: Cell<*mut u8>,
/// Interface to the interrupt counters
irq_statistics: &'static IrqStatistics,
/// Queue of async tasks
async_tasks: RefCell<VecDeque<AsyncTask>>,
/// The core-local async executor.
ex: StaticExecutor<RawSpinMutex, RawRwSpinLock>,
#[cfg(feature = "smp")]
pub hlt: AtomicBool,
/// Queues to handle incoming requests from the other cores
Expand Down Expand Up @@ -61,7 +61,7 @@ impl CoreLocal {
tss: Cell::new(ptr::null_mut()),
kernel_stack: Cell::new(ptr::null_mut()),
irq_statistics,
async_tasks: RefCell::new(VecDeque::new()),
ex: StaticExecutor::new(),
#[cfg(feature = "smp")]
hlt: AtomicBool::new(false),
#[cfg(feature = "smp")]
Expand Down Expand Up @@ -110,8 +110,8 @@ pub(crate) fn core_scheduler() -> &'static mut PerCoreScheduler {
unsafe { CoreLocal::get().scheduler.get().as_mut().unwrap() }
}

pub(crate) fn async_tasks() -> RefMut<'static, VecDeque<AsyncTask>> {
CoreLocal::get().async_tasks.borrow_mut()
pub(crate) fn ex() -> &'static StaticExecutor<RawSpinMutex, RawRwSpinLock> {
&CoreLocal::get().ex
}

pub(crate) fn set_core_scheduler(scheduler: *mut PerCoreScheduler) {
Expand Down
25 changes: 11 additions & 14 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use hermit_sync::without_interrupts;
#[cfg(any(feature = "tcp", feature = "udp"))]
use smoltcp::time::Instant;

use crate::arch::core_local::*;
use crate::arch::core_local;
use crate::errno::Errno;
use crate::executor::task::AsyncTask;
use crate::io;
Expand Down Expand Up @@ -94,16 +94,13 @@ impl Wake for TaskNotify {
}

pub(crate) fn run() {
let mut cx = Context::from_waker(Waker::noop());

without_interrupts(|| {
// FIXME(mkroening): Not all tasks register wakers and never sleep
for _ in 0..({ async_tasks().len() }) {
let mut task = { async_tasks().pop_front().unwrap() };
trace!("Run async task {}", task.id());

if task.poll(&mut cx).is_pending() {
async_tasks().push_back(task);
// FIXME: We currently have no more than 3 tasks at a time, so this is fine.
// Ideally, we would set this value to 200, but the network task currently immediately wakes up again.
// This would lead to the network task being polled 200 times back to back, slowing things down considerably.
for _ in 0..3 {
if !core_local::ex().try_tick() {
break;
}
}
});
Expand All @@ -118,7 +115,7 @@ pub(crate) fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
{
without_interrupts(|| async_tasks().push_back(AsyncTask::new(future)));
core_local::ex().spawn(AsyncTask::new(future)).detach();
}

pub fn init() {
Expand Down Expand Up @@ -177,7 +174,7 @@ where
} else {
None
};
core_scheduler().add_network_timer(
core_local::core_scheduler().add_network_timer(
delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
);
}
Expand All @@ -203,7 +200,7 @@ where
} else {
None
};
core_scheduler().add_network_timer(
core_local::core_scheduler().add_network_timer(
delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
);
}
Expand All @@ -230,7 +227,7 @@ where
};

if delay.unwrap_or(10_000_000) > 10_000 {
core_scheduler().add_network_timer(
core_local::core_scheduler().add_network_timer(
delay.map(|d| crate::arch::processor::get_timer_ticks() + d),
);
let wakeup_time =
Expand Down
17 changes: 9 additions & 8 deletions src/executor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use core::sync::atomic::{AtomicU32, Ordering};
use core::task::{Context, Poll};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct AsyncTaskId(u32);
struct AsyncTaskId(u32);

impl fmt::Display for AsyncTaskId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -25,22 +25,23 @@ impl AsyncTaskId {

pub(crate) struct AsyncTask {
id: AsyncTaskId,
future: Pin<Box<dyn Future<Output = ()>>>,
future: Pin<Box<dyn Future<Output = ()> + Send>>,
}

impl AsyncTask {
pub fn new(future: impl Future<Output = ()> + 'static) -> AsyncTask {
pub fn new(future: impl Future<Output = ()> + Send + 'static) -> AsyncTask {
AsyncTask {
id: AsyncTaskId::new(),
future: Box::pin(future),
}
}
}

pub fn id(&self) -> AsyncTaskId {
self.id
}
impl Future for AsyncTask {
type Output = ();

pub fn poll(&mut self, context: &mut Context<'_>) -> Poll<()> {
self.future.as_mut().poll(context)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
trace!("Run async task {}", self.id);
self.as_mut().future.as_mut().poll(cx)
}
}