refactor: centralize runtime logic (v0.4)

Extract scheduler responsibilities into a dedicated Runtime component:
- src/runtime.rs: New centralized control flow (669 lines)
- src/scheduler.rs: Simplified to task queue & preemption management
- tests/runtime.rs: Comprehensive runtime test suite
- benches/multi_scheduler.rs: Multi-runtime scheduling benchmarks
- Improves modularity and enables per-runtime configuration
This commit is contained in:
Claude
2026-05-23 16:09:32 +00:00
parent 8cbef1dfc1
commit e9fdbb1160
10 changed files with 1694 additions and 919 deletions

View File

@@ -1,12 +1,8 @@
//! Unbounded MPSC channels.
//!
//! Single-threaded scheduler: the inner state is `Rc<RefCell<Inner<T>>>`,
//! not `Arc<Mutex>`. We hand-implement `Send` for `Sender<T>` and
//! `Receiver<T>` when `T: Send`, on the basis that the only way two actor
//! contexts touch the same channel is by being scheduled on the *same* OS
//! thread (v0.1 has exactly one). When we add a second scheduler thread,
//! this lie must be retired: replace `Rc<RefCell>` with `Arc<Mutex>` (or a
//! lock-free queue) and remove the unsafe Send impls.
//! Inner state is `Arc<Mutex<Inner<T>>>` so channels can be sent across OS
//! threads (required for the multi-scheduler runtime where a sender and
//! receiver may run on different scheduler threads simultaneously).
//!
//! Semantics:
//! - Senders are clonable; the last sender drop closes the channel.
@@ -19,12 +15,11 @@
//! parked, the receiver is unparked.
use crate::pid::Pid;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
let inner = Arc::new(Mutex::new(Inner {
queue: VecDeque::new(),
parked_receiver: None,
senders: 1,
@@ -41,20 +36,13 @@ struct Inner<T> {
}
pub struct Sender<T> {
inner: Rc<RefCell<Inner<T>>>,
inner: Arc<Mutex<Inner<T>>>,
}
pub struct Receiver<T> {
inner: Rc<RefCell<Inner<T>>>,
inner: Arc<Mutex<Inner<T>>>,
}
// SAFETY (v0.1 only): the scheduler is single-threaded. Sender/Receiver can
// be captured into actor closures (which require Send), but they will only
// ever be touched from one OS thread. When multi-threading lands, swap the
// `Rc<RefCell>` for `Arc<Mutex>` and remove these.
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Send for Receiver<T> {}
#[derive(Debug, PartialEq, Eq)]
pub struct SendError<T>(pub T);
@@ -71,7 +59,7 @@ impl std::error::Error for RecvError {}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.inner.borrow_mut().senders += 1;
self.inner.lock().unwrap().senders += 1;
Sender { inner: self.inner.clone() }
}
}
@@ -79,11 +67,9 @@ impl<T> Clone for Sender<T> {
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let unpark = {
let mut g = self.inner.borrow_mut();
let mut g = self.inner.lock().unwrap();
g.senders -= 1;
if g.senders == 0 && g.queue.is_empty() {
// Channel closed and drained. Wake the receiver so it can
// see RecvError.
g.parked_receiver.take()
} else {
None
@@ -97,19 +83,18 @@ impl<T> Drop for Sender<T> {
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.inner.borrow_mut().receiver_alive = false;
self.inner.lock().unwrap().receiver_alive = false;
}
}
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
let unpark = {
let mut g = self.inner.borrow_mut();
let mut g = self.inner.lock().unwrap();
if !g.receiver_alive {
return Err(SendError(value));
}
g.queue.push_back(value);
// If the receiver is parked, unpark it.
g.parked_receiver.take()
};
if let Some(pid) = unpark {
@@ -122,16 +107,14 @@ impl<T> Sender<T> {
impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T, RecvError> {
loop {
// Try to take a message.
{
let mut g = self.inner.borrow_mut();
let mut g = self.inner.lock().unwrap();
if let Some(v) = g.queue.pop_front() {
return Ok(v);
}
if g.senders == 0 {
return Err(RecvError);
}
// Empty + open: register and park.
let me = crate::actor::current_pid()
.expect("recv() called outside an actor");
debug_assert!(
@@ -140,18 +123,15 @@ impl<T> Receiver<T> {
);
g.parked_receiver = Some(me);
}
// Release the borrow before parking — the unparker will need it.
// Release the lock before parking — the unparker will need it.
crate::scheduler::park_current();
// Loop: the message that woke us might already have been taken
// (it can't, with one receiver, but the senders=0 path can fire
// here too).
}
}
/// Non-blocking. `Ok(Some(v))` if a message was available, `Ok(None)` if
/// the channel is empty but open, `Err(RecvError)` if closed and drained.
pub fn try_recv(&self) -> Result<Option<T>, RecvError> {
let mut g = self.inner.borrow_mut();
let mut g = self.inner.lock().unwrap();
if let Some(v) = g.queue.pop_front() {
return Ok(Some(v));
}

View File

@@ -2,14 +2,12 @@
//!
//! Erlang-style green-thread actor concurrency for Rust.
//!
//! Single-threaded for now: one scheduler, one OS thread. The scheduler
//! cooperatively interleaves green-thread actors with hand-rolled context
//! switches. Actors communicate by sending `Send` messages over channels;
//! every actor has a supervisor, which is itself just an actor with a
//! `Receiver<Signal>`. Synchronisation primitives — `Mutex<T>` with
//! mandatory lock timeouts, channel `recv`, `sleep`, and epoll-backed
//! `wait_readable`/`wait_writable` — all park the green thread, never
//! the OS thread.
//! Multi-threaded: N scheduler OS threads (default: one per CPU) share a
//! single global run queue behind a `Mutex`. Actors communicate by sending
//! `Send` messages over channels; every actor has a supervisor. Synchronisation
//! primitives — `Mutex<T>` with mandatory lock timeouts, channel `recv`,
//! `sleep`, and epoll-backed `wait_readable`/`wait_writable` — all park the
//! green thread, never the OS thread.
//!
//! See `LOOM.md` for the design intent and the deferred-for-later list.
@@ -24,13 +22,10 @@ pub mod supervisor;
pub mod timer;
pub mod io;
pub mod mutex;
pub mod runtime;
// ---------------------------------------------------------------------------
// Global allocator
//
// The preempting allocator wraps `System`. While `PREEMPTION_ENABLED` is
// false (the default outside an actor) it adds one branch per allocation
// and no syscalls. The scheduler flips it on per-resume.
// ---------------------------------------------------------------------------
#[global_allocator]
@@ -43,31 +38,19 @@ static ALLOCATOR: preempt::PreemptingAllocator = preempt::PreemptingAllocator;
pub use channel::{channel, Receiver, RecvError, Sender};
pub use mutex::{LockTimeout, Mutex, MutexGuard};
pub use pid::Pid;
pub use runtime::{init, Config, Runtime};
pub use scheduler::{
block_on_io, run, self_pid, sleep, spawn, spawn_under, wait_readable, wait_writable,
yield_now, JoinError, JoinHandle,
};
// `read` and `write` would shadow heavily-used names if re-exported at the
// crate root; users reach for them as `smarm::scheduler::read` /
// `smarm::scheduler::write` instead. May reshuffle into a `smarm::io`
// surface in a future pass.
pub use supervisor::Signal;
// ---------------------------------------------------------------------------
// check!() — explicit preemption point for tight no-alloc loops.
// check!()
// ---------------------------------------------------------------------------
/// Voluntarily check whether this actor's timeslice has expired, yielding
/// if so. Drop this into hot compute loops that don't allocate (heap or
/// large stack frames) — without it, such loops monopolise the scheduler
/// until they return.
///
/// Decrements the same per-actor event counter as the heap allocator's
/// preemption hook, so the check rate is identical regardless of whether
/// the actor is alloc-heavy, check-heavy, or mixed.
///
/// No-op outside an actor (the runtime's `PREEMPTION_ENABLED` flag is
/// false there).
/// if so.
#[macro_export]
macro_rules! check {
() => {

View File

@@ -1,63 +1,20 @@
//! Actor-aware mutex with mandatory timeout.
//!
//! `loom::Mutex<T>` looks like `std::sync::Mutex<T>` but its `lock()` parks
//! the calling *green* thread on contention rather than blocking the OS
//! thread — and every lock attempt is bounded by a timeout. If the lock is
//! not acquired within the timeout, `lock()` returns `Err(LockTimeout)`.
//! This is a hard runtime guarantee (the spec calls it out): no actor can
//! be parked on a mutex forever.
//! `Mutex<T>` parks the calling *green* thread on contention rather than
//! blocking the OS thread. Every lock attempt is bounded by a timeout.
//!
//! ```ignore
//! let m = loom::Mutex::new(42);
//! let guard = m.lock()?; // default timeout
//! let guard = m.lock_timeout(Duration::from_millis(50))?;
//! ```
//! Internals use `Arc<std::sync::Mutex<...>>` so the type is genuinely
//! `Send + Sync` and can be shared across scheduler threads.
//!
//! Fairness
//! ========
//! Waiters are granted the lock in FIFO order. The spec prizes fairness:
//! starvation under contention is precisely the kind of failure mode
//! supervision can't recover from cleanly. LIFO would be faster on cache
//! locality and is not offered.
//!
//! Poisoning
//! =========
//! Unlike `std::sync::Mutex`, `loom::Mutex` does not poison on panic. If a
//! holder panics while holding the lock, the next waiter receives the
//! (now-untouched) value. Rationale: supervision handles the panic at the
//! actor level; a separate poisoning channel is redundant and adds an
//! error case to every `lock()`. Users who care about "the value may be in
//! an inconsistent state after a panic" should encode that in `T` itself
//! (e.g. `Mutex<Option<State>>` and `take()` the value at the start of
//! each critical section).
//!
//! Reentrance
//! ==========
//! Not reentrant. An actor that already holds the lock and calls `lock()`
//! again on the same mutex will wait on its own grant — and time out. This
//! is a bug in the caller, not a feature.
//!
//! Multi-threading note
//! ====================
//! The current implementation uses `Rc<RefCell<…>>` internals because the
//! v0.2 scheduler is single-threaded. The public API is identical to what
//! the eventual multi-threaded version will expose; the migration replaces
//! the `Rc<RefCell>` with `Arc<sync::Mutex>` around bookkeeping (waiters
//! queue, holder pid) — the *value* itself never goes through a blocking
//! OS-level lock, because contention always parks the green thread first.
//! No `unsafe impl Send` games today: `loom::Mutex<T>` is `!Send` on v0.2,
//! which is correct given there is only one OS thread.
//! Fairness: FIFO. Poisoning: none. Reentrance: deadlock (caller bug).
use crate::pid::Pid;
use crate::scheduler;
use crate::timer::{self, TimerTarget};
use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::rc::Rc;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
/// 30 seconds. Override per-call with `lock_timeout`, or per-mutex (TODO)
/// once the supervisor-level policy hook lands.
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -74,63 +31,55 @@ impl std::error::Error for LockTimeout {}
// Internals
// ---------------------------------------------------------------------------
/// A pending lock attempt. Sits in `MutexCore::state.waiters` from the
/// moment an actor parks until it is either granted the lock (popped by
/// `MutexGuard::drop`) or times out (popped by `on_timeout`).
struct Wait {
pid: Pid,
/// Per-mutex monotonic sequence. Lets `on_timeout` recognise "this
/// specific wait" vs. "a later wait by the same pid on the same
/// mutex" — important because a single actor can re-acquire and then
/// re-wait, and we don't want a stale timer firing to disturb the new
/// wait.
seq: u64,
}
/// The non-generic part of the mutex. Lives inside `Rc<>` so it can also
/// be stashed (as `Rc<dyn TimerTarget>`) inside a timer entry.
struct MutexCore {
state: RefCell<MutexState>,
default_timeout: Cell<Duration>,
}
struct MutexState {
holder: Option<Pid>,
waiters: VecDeque<Wait>,
next_seq: u64,
default_timeout: Duration,
}
struct MutexCore {
state: StdMutex<MutexState>,
}
impl MutexCore {
fn new(default_timeout: Duration) -> Self {
Self {
state: RefCell::new(MutexState {
state: StdMutex::new(MutexState {
holder: None,
waiters: VecDeque::new(),
next_seq: 0,
default_timeout,
}),
default_timeout: Cell::new(default_timeout),
}
}
}
impl TimerTarget for MutexCore {
fn on_timeout(&self, pid: Pid, wait_seq: u64) {
// Remove the waiter with this seq, if it's still queued. If it's
// gone the lock was already granted to this actor before the timer
// popped — the actor will return normally; do nothing.
let removed = {
let mut st = self.state.borrow_mut();
if let Some(pos) = st.waiters.iter().position(|w| w.seq == wait_seq) {
st.waiters.remove(pos);
let unpark = {
let mut st = self.state.lock().unwrap();
// Remove from waiters only if still there with matching seq.
// If the lock was already granted (holder == Some(pid)), the
// timer fired after the grant — treat as no-op; the actor
// will see `is_holder == true` and return Ok.
if st.holder == Some(pid) {
return;
}
let pos = st.waiters.iter().position(|w| w.pid == pid && w.seq == wait_seq);
if pos.is_some() {
st.waiters.remove(pos.unwrap());
true
} else {
false
}
};
if removed {
// The actor is parked, waiting on us. Wake it up; `lock_timeout`
// will resume, observe `holder != Some(self)`, and return
// LockTimeout.
if unpark {
scheduler::unpark(pid);
}
}
@@ -141,145 +90,105 @@ impl TimerTarget for MutexCore {
// ---------------------------------------------------------------------------
pub struct Mutex<T> {
core: Rc<MutexCore>,
/// `None` while the lock is held; `Some(T)` while free or while a
/// grantee is in the gap between unpark and resumption.
value: Rc<RefCell<Option<T>>>,
core: Arc<MutexCore>,
/// Protected value. `None` while a guard is live; `Some` while free.
value: Arc<StdMutex<Option<T>>>,
}
impl<T> Mutex<T> {
pub fn new(value: T) -> Self {
Self {
core: Rc::new(MutexCore::new(DEFAULT_TIMEOUT)),
value: Rc::new(RefCell::new(Some(value))),
core: Arc::new(MutexCore::new(DEFAULT_TIMEOUT)),
value: Arc::new(StdMutex::new(Some(value))),
}
}
/// Set the default timeout used by `lock()`. Does not affect in-flight
/// `lock_timeout` calls.
pub fn set_default_timeout(&self, timeout: Duration) {
self.core.default_timeout.set(timeout);
self.core.state.lock().unwrap().default_timeout = timeout;
}
/// Acquire the lock, blocking the calling actor until it's granted or
/// the default timeout expires.
pub fn lock(&self) -> Result<MutexGuard<'_, T>, LockTimeout> {
self.lock_timeout(self.core.default_timeout.get())
let timeout = self.core.state.lock().unwrap().default_timeout;
self.lock_timeout(timeout)
}
/// Acquire the lock with an explicit timeout.
pub fn lock_timeout(&self, timeout: Duration) -> Result<MutexGuard<'_, T>, LockTimeout> {
let me = scheduler::self_pid();
// Fast path: nobody holds it. Mark ourselves as holder, take the
// value out, return a guard.
// Fast path: nobody holds it.
{
let mut st = self.core.state.borrow_mut();
let mut st = self.core.state.lock().unwrap();
if st.holder.is_none() {
st.holder = Some(me);
drop(st);
let value = self
.value
.borrow_mut()
.take()
let value = self.value.lock().unwrap().take()
.expect("Mutex: value missing on free fast path");
return Ok(MutexGuard {
mutex: self,
value: Some(value),
});
return Ok(MutexGuard { mutex: self, value: Some(value) });
}
}
// Slow path: register as a waiter, schedule a timeout, park.
// No preemption during prep-to-park — see scheduler::NoPreempt.
// Slow path: register as a waiter, set timeout, park.
let _np = scheduler::NoPreempt::enter();
let seq = {
let mut st = self.core.state.borrow_mut();
let mut st = self.core.state.lock().unwrap();
let seq = st.next_seq;
st.next_seq = st.next_seq.wrapping_add(1);
st.waiters.push_back(Wait { pid: me, seq });
seq
};
let target: Rc<dyn TimerTarget> = self.core.clone();
let target: Arc<dyn TimerTarget> = self.core.clone();
let deadline = timer::deadline_from_now(timeout);
scheduler::insert_wait_timer(deadline, me, target, seq);
scheduler::park_current();
// Resumed. Two possibilities:
// (a) MutexGuard::drop on the previous holder popped us off the
// waiters queue, set core.holder = me, and unparked us.
// => self.value is Some, we take it and return Ok.
// (b) on_timeout fired: it removed us from waiters and unparked
// us, but did NOT set holder. core.holder is whatever it was
// (Some(other) or None). => return Err.
let is_holder = self.core.state.borrow().holder == Some(me);
// Resumed. Are we the holder?
let is_holder = self.core.state.lock().unwrap().holder == Some(me);
if is_holder {
let value = self
.value
.borrow_mut()
.take()
let value = self.value.lock().unwrap().take()
.expect("Mutex: value missing after grant");
Ok(MutexGuard {
mutex: self,
value: Some(value),
})
Ok(MutexGuard { mutex: self, value: Some(value) })
} else {
Err(LockTimeout)
}
}
/// Non-blocking attempt. Returns `Some` if the lock was free, `None`
/// otherwise. Useful as a fast path before a long-running computation,
/// or for tests.
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
let mut st = self.core.state.borrow_mut();
let me = scheduler::self_pid();
let mut st = self.core.state.lock().unwrap();
if st.holder.is_some() {
return None;
}
let me = scheduler::self_pid();
st.holder = Some(me);
drop(st);
let value = self
.value
.borrow_mut()
.take()
let value = self.value.lock().unwrap().take()
.expect("Mutex: value missing on try_lock free path");
Some(MutexGuard {
mutex: self,
value: Some(value),
})
Some(MutexGuard { mutex: self, value: Some(value) })
}
}
impl<T> Clone for Mutex<T> {
/// Cloning a `Mutex<T>` clones the handle, not the protected value —
/// both clones refer to the same lock state and the same `T`.
fn clone(&self) -> Self {
Self {
core: self.core.clone(),
value: self.value.clone(),
}
Self { core: self.core.clone(), value: self.value.clone() }
}
}
// Genuinely Send + Sync now that internals are Arc<std::sync::Mutex<...>>.
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
// ---------------------------------------------------------------------------
// Guard
// ---------------------------------------------------------------------------
pub struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
/// The protected value, taken out of `mutex.value` while the guard is
/// alive. `Option` only so `Drop` can put it back; in normal use this
/// is always `Some` while the guard is observable.
value: Option<T>,
}
impl<T> std::ops::Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.value.as_ref().expect("MutexGuard: value missing")
}
fn deref(&self) -> &T { self.value.as_ref().expect("MutexGuard: value missing") }
}
impl<T> std::ops::DerefMut for MutexGuard<'_, T> {
@@ -288,19 +197,22 @@ impl<T> std::ops::DerefMut for MutexGuard<'_, T> {
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("MutexGuard")
.field(self.value.as_ref().expect("MutexGuard: value missing"))
.finish()
}
}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
// Put the value back into the mutex.
let v = self.value.take().expect("MutexGuard: double drop");
*self.mutex.value.borrow_mut() = Some(v);
*self.mutex.value.lock().unwrap() = Some(v);
// Pick the next waiter (if any) and grant it the lock by writing
// its pid into `holder` *before* unparking. The grantee, on
// resumption, will see `holder == self_pid` and take the value.
let next_pid = {
let mut st = self.mutex.core.state.borrow_mut();
let next = st.waiters.pop_front();
match next {
let mut st = self.mutex.core.state.lock().unwrap();
match st.waiters.pop_front() {
Some(w) => {
st.holder = Some(w.pid);
Some(w.pid)

669
src/runtime.rs Normal file
View File

@@ -0,0 +1,669 @@
//! Multi-scheduler runtime: configuration, initialisation, and the shared
//! state that all scheduler OS threads operate against.
//!
//! # Architecture
//!
//! ```text
//! init(Config) → Runtime (Arc<RuntimeInner>)
//!
//! RuntimeInner {
//! shared: Mutex<SharedState> ← slot table, run queue, timers, IO
//! stats: Vec<SchedulerStats> ← one per thread, lockless atomics (RFC 000)
//! io_parked: AtomicU32 ← actors parked on IO
//! sleeping: AtomicU32 ← actors parked on timer
//! }
//! ```
//!
//! `Runtime::run(f)` spawns N OS threads (one per `Config::resolved_thread_count()`),
//! each running `schedule_loop`. It blocks until all scheduler threads exit,
//! i.e. until the run queue is empty and nothing is pending.
//!
//! Each scheduler thread holds an `Arc<RuntimeInner>` clone. Per-thread
//! identity is a small integer index, stored in a thread-local, used to index
//! into `stats`.
//!
//! # Timer / IO drain (try-lock, one-winner)
//!
//! On each loop iteration every scheduler thread tries `try_lock()` on a
//! separate `drain_lock: Mutex<()>`. The winner drains due timers and IO
//! completions; losers skip and move straight to popping an actor from the
//! run queue. This is the simplest correct approach; revisit if the drain
//! becomes a measured bottleneck.
use crate::actor::{
clear_current_pid, current_pid, is_actor_done, reset_actor_done,
set_current_actor_box, set_current_pid, take_last_outcome, Actor, Outcome,
};
use crate::channel::Sender;
use crate::context::{get_actor_sp, set_actor_sp, switch_to_actor};
use crate::io::IoThread;
use crate::pid::Pid;
use crate::preempt::PREEMPTION_ENABLED;
use crate::supervisor::Signal;
use crate::timer::Timers;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
// ---------------------------------------------------------------------------
// Config
// ---------------------------------------------------------------------------
/// Runtime configuration.
///
/// ```
/// use smarm::runtime::Config;
///
/// // Use all available CPUs (default):
/// let c = Config::default();
///
/// // Exactly 4 scheduler threads:
/// let c = Config::exact(4);
///
/// // Between 2 and 8, clamped to available parallelism:
/// let c = Config::new(2, 8, None);
/// ```
#[derive(Clone, Debug)]
pub struct Config {
min: usize,
max: usize,
exact: Option<usize>,
}
impl Config {
/// Exact thread count; takes precedence over min/max.
pub fn exact(n: usize) -> Self {
assert!(n >= 1, "scheduler thread count must be ≥ 1");
Self { min: n, max: n, exact: Some(n) }
}
/// Bounded range. Thread count = clamp(available_parallelism, min, max).
pub fn new(min: usize, max: usize, exact: Option<usize>) -> Self {
assert!(min >= 1, "min must be ≥ 1");
assert!(max >= min, "max must be ≥ min");
if let Some(e) = exact {
assert!(e >= 1, "exact must be ≥ 1");
}
Self { min, max, exact }
}
/// The number of scheduler threads this config resolves to.
pub fn resolved_thread_count(&self) -> usize {
if let Some(e) = self.exact {
return e;
}
let avail = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
avail.clamp(self.min, self.max)
}
}
impl Default for Config {
fn default() -> Self {
let avail = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
Self { min: 1, max: avail, exact: None }
}
}
// ---------------------------------------------------------------------------
// Per-thread stats (RFC 000 Layer 1 primitives)
// ---------------------------------------------------------------------------
/// Lockless per-scheduler-thread counters. Written only by the owning thread;
/// readable from any thread (introspection actor, tests).
pub struct SchedulerStats {
/// PID index of the actor currently on-CPU, or `u32::MAX` when idle.
pub current_pid_index: AtomicU32,
/// Snapshot of run queue length maintained on every push/pop.
pub run_queue_len: AtomicU64,
}
impl SchedulerStats {
fn new() -> Self {
Self {
current_pid_index: AtomicU32::new(u32::MAX),
run_queue_len: AtomicU64::new(0),
}
}
}
// ---------------------------------------------------------------------------
// Runtime stats snapshot (for tests / introspection)
// ---------------------------------------------------------------------------
pub struct RuntimeStats {
pub(crate) inner: Arc<RuntimeInner>,
}
impl RuntimeStats {
/// Sum of run queue lengths across all scheduler threads.
pub fn total_run_queue_len(&self) -> u64 {
self.inner.stats.iter()
.map(|s| s.run_queue_len.load(Ordering::Relaxed))
.sum()
}
/// Number of scheduler threads.
pub fn scheduler_count(&self) -> usize {
self.inner.stats.len()
}
/// Actors currently parked on IO.
pub fn io_parked_count(&self) -> u32 {
self.inner.io_parked.load(Ordering::Relaxed)
}
/// Actors currently sleeping on a timer.
pub fn sleeping_count(&self) -> u32 {
self.inner.sleeping.load(Ordering::Relaxed)
}
}
// ---------------------------------------------------------------------------
// Shared state (behind Mutex<>)
// ---------------------------------------------------------------------------
pub(crate) const ACTOR_STACK_SIZE: usize = 64 * 1024;
pub(crate) enum State { Runnable, Parked, Done }
struct Slot {
generation: u32,
actor: Option<Actor>,
state: State,
waiters: Vec<Pid>,
outcome: Option<Outcome>,
supervisor_channel: Option<Sender<Signal>>,
outstanding_handles: u32,
pending_io_result: Option<crate::io::IoResult>,
}
impl Slot {
fn vacant() -> Self {
Self {
generation: 0,
actor: None,
state: State::Done,
waiters: Vec::new(),
outcome: None,
supervisor_channel: None,
outstanding_handles: 0,
pending_io_result: None,
}
}
}
pub(crate) type Closure = Box<dyn FnOnce() + Send>;
pub(crate) struct SharedState {
pub(crate) slots: Vec<Slot>,
pub(crate) free_list: Vec<u32>,
pub(crate) run_queue: VecDeque<Pid>,
pub(crate) root_pid: Option<Pid>,
pub(crate) timers: Timers,
pub(crate) io: Option<IoThread>,
/// Closures awaiting their first resume, keyed by Pid.
pub(crate) pending_closures: Vec<(Pid, Closure)>,
}
impl SharedState {
fn new() -> Self {
Self {
slots: Vec::new(),
free_list: Vec::new(),
run_queue: VecDeque::new(),
root_pid: None,
timers: Timers::new(),
io: None,
pending_closures: Vec::new(),
}
}
fn allocate_slot(&mut self) -> (u32, u32) {
if let Some(idx) = self.free_list.pop() {
let gen = self.slots[idx as usize].generation;
(idx, gen)
} else {
let idx = self.slots.len() as u32;
self.slots.push(Slot::vacant());
(idx, 0)
}
}
fn slot(&self, pid: Pid) -> Option<&Slot> {
let s = self.slots.get(pid.index() as usize)?;
if s.generation == pid.generation() { Some(s) } else { None }
}
fn slot_mut(&mut self, pid: Pid) -> Option<&mut Slot> {
let s = self.slots.get_mut(pid.index() as usize)?;
if s.generation == pid.generation() { Some(s) } else { None }
}
fn pop_pending_closure(&mut self, pid: Pid) -> Option<Closure> {
let pos = self.pending_closures.iter().position(|(p, _)| *p == pid)?;
Some(self.pending_closures.swap_remove(pos).1)
}
}
// ---------------------------------------------------------------------------
// RuntimeInner — the shared core behind an Arc
// ---------------------------------------------------------------------------
pub(crate) struct RuntimeInner {
pub(crate) shared: Mutex<SharedState>,
/// Try-lock: exactly one scheduler thread drains timers/IO per iteration.
drain_lock: Mutex<()>,
/// Per-thread stats, indexed by scheduler thread slot (0..N).
pub(crate) stats: Vec<SchedulerStats>,
/// Global counters for RFC 000 primitives.
pub(crate) io_parked: AtomicU32,
pub(crate) sleeping: AtomicU32,
}
impl RuntimeInner {
fn new(thread_count: usize) -> Arc<Self> {
let stats = (0..thread_count).map(|_| SchedulerStats::new()).collect();
Arc::new(Self {
shared: Mutex::new(SharedState::new()),
drain_lock: Mutex::new(()),
stats,
io_parked: AtomicU32::new(0),
sleeping: AtomicU32::new(0),
})
}
fn with_shared<R>(&self, f: impl FnOnce(&mut SharedState) -> R) -> R {
f(&mut self.shared.lock().unwrap())
}
/// Returns `None` when the mutex is poisoned.
/// Used in `unpark` / channel Drop which can fire after teardown.
fn try_with_shared<R>(&self, f: impl FnOnce(&mut SharedState) -> R) -> Option<R> {
match self.shared.lock() {
Ok(mut g) => Some(f(&mut g)),
Err(p) => Some(f(&mut p.into_inner())),
}
}
}
// ---------------------------------------------------------------------------
// Runtime — the public handle
// ---------------------------------------------------------------------------
pub struct Runtime {
inner: Arc<RuntimeInner>,
thread_count: usize,
}
/// Initialise the runtime with the given config. Returns a reusable handle.
pub fn init(config: Config) -> Runtime {
let n = config.resolved_thread_count();
Runtime {
inner: RuntimeInner::new(n),
thread_count: n,
}
}
impl Runtime {
/// Run `f` as the initial actor, block until all actors finish.
/// Can be called multiple times sequentially on the same `Runtime`.
pub fn run(&self, f: impl FnOnce() + Send + 'static) {
// Re-initialise shared state for this run.
{
let mut s = self.inner.shared.lock().unwrap();
assert!(s.run_queue.is_empty(), "run() called while previous run still active");
s.root_pid = Some(ROOT_PID);
s.io = Some(IoThread::start().expect("failed to start IO thread"));
}
// Spawn the initial actor through the public spawn path (which
// requires a running runtime in the thread-local).
RUNTIME.with(|r| *r.borrow_mut() = Some(self.inner.clone()));
let initial_handle = crate::scheduler::spawn(f);
// Launch N-1 extra scheduler threads. The calling thread is thread 0.
let mut os_threads = Vec::new();
for slot in 1..self.thread_count {
let inner = self.inner.clone();
let t = thread::spawn(move || {
RUNTIME.with(|r| *r.borrow_mut() = Some(inner.clone()));
SCHED_SLOT.with(|s| s.set(slot));
schedule_loop(&inner, slot);
RUNTIME.with(|r| *r.borrow_mut() = None);
});
os_threads.push(t);
}
// Thread 0 runs the loop on the calling thread.
SCHED_SLOT.with(|s| s.set(0));
schedule_loop(&self.inner, 0);
// Wait for all other scheduler threads.
for t in os_threads {
let _ = t.join();
}
// Drop initial handle (decrements outstanding_handles count).
drop(initial_handle);
// Tear down IO and clean up shared state for the next run() call.
let mut s = self.inner.shared.lock().unwrap();
drop(s.io.take()); // joins IO threads
s.pending_closures.clear();
// Reset per-thread stats.
for stat in &self.inner.stats {
stat.current_pid_index.store(u32::MAX, Ordering::Relaxed);
stat.run_queue_len.store(0, Ordering::Relaxed);
}
self.inner.io_parked.store(0, Ordering::Relaxed);
self.inner.sleeping.store(0, Ordering::Relaxed);
RUNTIME.with(|r| *r.borrow_mut() = None);
}
/// Snapshot of runtime statistics for introspection / tests.
pub fn stats(&self) -> RuntimeStats {
RuntimeStats { inner: self.inner.clone() }
}
}
// ---------------------------------------------------------------------------
// Thread-locals
// ---------------------------------------------------------------------------
use std::cell::{Cell, RefCell};
thread_local! {
/// The RuntimeInner for the current run(). Set by run() on the calling
/// thread and by each spawned scheduler thread.
pub(crate) static RUNTIME: RefCell<Option<Arc<RuntimeInner>>> =
const { RefCell::new(None) };
/// This scheduler thread's index into RuntimeInner::stats.
static SCHED_SLOT: Cell<usize> = const { Cell::new(0) };
/// What the actor wants when it yields back to the scheduler.
static YIELD_INTENT: Cell<YieldIntent> = const { Cell::new(YieldIntent::Yield) };
}
#[derive(Copy, Clone)]
pub(crate) enum YieldIntent { Yield, Park }
pub(crate) fn set_yield_intent(i: YieldIntent) {
YIELD_INTENT.with(|c| c.set(i));
}
// ---------------------------------------------------------------------------
// Sentinel root PID
// ---------------------------------------------------------------------------
pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX);
// ---------------------------------------------------------------------------
// Slot reclamation
// ---------------------------------------------------------------------------
pub(crate) fn reclaim_slot(s: &mut SharedState, pid: Pid) {
let idx = pid.index();
let slot = &mut s.slots[idx as usize];
slot.generation = slot.generation.wrapping_add(1);
slot.actor = None;
slot.outcome = None;
slot.waiters.clear();
slot.supervisor_channel = None;
slot.state = State::Done;
slot.outstanding_handles = 0;
slot.pending_io_result = None;
s.free_list.push(idx);
}
// ---------------------------------------------------------------------------
// finalize_actor
// ---------------------------------------------------------------------------
fn finalize_actor(inner: &Arc<RuntimeInner>, pid: Pid, outcome: Outcome) {
let (joiner_outcome, sup_signal) = match outcome {
Outcome::Exit => (Outcome::Exit, Signal::Exit(pid)),
Outcome::Panic(payload) => (
Outcome::Panic(payload),
Signal::Panic(pid, Box::new(()) as Box<dyn std::any::Any + Send>),
),
};
let (waiters, supervisor_pid) = inner.with_shared(|s| {
let slot = s.slot_mut(pid).expect("finalize_actor: slot vanished");
let sup = slot.actor.as_ref().map(|a| a.supervisor);
slot.outcome = Some(joiner_outcome);
slot.state = State::Done;
slot.actor = None;
(std::mem::take(&mut slot.waiters), sup)
});
// Deliver to supervisor.
if let Some(sup) = supervisor_pid {
let sender = inner.with_shared(|s| {
s.slot(sup).and_then(|slot| slot.supervisor_channel.clone())
});
if let Some(sender) = sender {
let _ = sender.send(sup_signal);
}
}
// Unpark joiners.
for joiner in waiters {
crate::scheduler::unpark(joiner);
}
// Reclaim if no outstanding handles.
inner.with_shared(|s| {
let reclaim = s.slot(pid).map(|slot| slot.outstanding_handles == 0).unwrap_or(false);
if reclaim { reclaim_slot(s, pid); }
});
}
// ---------------------------------------------------------------------------
// schedule_loop — runs on each scheduler OS thread
// ---------------------------------------------------------------------------
fn schedule_loop(inner: &Arc<RuntimeInner>, slot: usize) {
let stats = &inner.stats[slot];
loop {
// ----------------------------------------------------------------
// 1. Try to win the drain lock (timers + IO). One winner per round;
// losers skip immediately and proceed to step 2.
// ----------------------------------------------------------------
if let Ok(_drain_guard) = inner.drain_lock.try_lock() {
let now = std::time::Instant::now();
// Drain due timers.
let due = inner.with_shared(|s| s.timers.pop_due(now));
for entry in due {
match entry.reason {
crate::timer::Reason::Sleep => {
inner.with_shared(|s| {
if let Some(slot) = s.slot_mut(entry.pid) {
if matches!(slot.state, State::Parked) {
slot.state = State::Runnable;
s.run_queue.push_back(entry.pid);
}
}
});
}
crate::timer::Reason::WaitTimeout { target, wait_seq } => {
// Runs outside with_shared — the callback may call unpark.
target.on_timeout(entry.pid, wait_seq);
}
}
}
// Drain IO completions.
let completions = inner.with_shared(|s| {
s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default()
});
for completion in completions {
match completion {
crate::io::Completion::Blocking { pid, result } => {
inner.with_shared(|s| {
if let Some(io) = s.io.as_mut() {
io.outstanding = io.outstanding.saturating_sub(1);
}
if let Some(slot) = s.slot_mut(pid) {
slot.pending_io_result = Some(result);
if matches!(slot.state, State::Parked) {
slot.state = State::Runnable;
s.run_queue.push_back(pid);
}
}
});
}
crate::io::Completion::FdReady { fd, events: _ } => {
inner.with_shared(|s| {
let parked_pid = s.io.as_mut().and_then(|io| {
let pid = io.waiters.remove(&fd);
io.epoll_deregister(fd);
pid
});
if let Some(pid) = parked_pid {
if let Some(slot) = s.slot_mut(pid) {
if matches!(slot.state, State::Parked) {
slot.state = State::Runnable;
s.run_queue.push_back(pid);
}
}
}
});
}
}
}
} // drain_guard drops here
// ----------------------------------------------------------------
// 2. Pop a runnable actor from the shared queue.
// ----------------------------------------------------------------
let pid = match inner.with_shared(|s| {
let len = s.run_queue.len() as u64;
stats.run_queue_len.store(len, Ordering::Relaxed);
s.run_queue.pop_front()
}) {
Some(p) => p,
None => {
// Nothing runnable. Check whether we should wait or exit.
let (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) =
inner.with_shared(|s| {
let next = s.timers.peek_deadline();
let (out, fd) = match s.io.as_ref() {
Some(io) => (
io.outstanding + io.waiters.len() as u32,
Some(io.wake_fd()),
),
None => (0, None),
};
// Count actors that are not Done (Runnable or Parked).
let live = s.slots.iter().filter(|slot| {
slot.actor.is_some()
}).count();
(next, out, fd, s.run_queue.is_empty(), live)
});
match (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) {
// Queue is now non-empty (another thread added work): retry.
(_, _, _, false, _) => continue,
// Truly idle — no timers, no IO, no live actors.
(None, 0, _, true, 0) => return,
// Live actors but queue empty: they must be parked on IO or
// timers. Wait on the appropriate source.
(Some(deadline), _, fd_opt, true, _) => {
let now = std::time::Instant::now();
if deadline > now {
let timeout = deadline - now;
match fd_opt {
Some(fd) => {
crate::io::poll_wake(fd, Some(timeout));
crate::io::drain_wake_pipe(fd);
}
None => thread::sleep(timeout),
}
}
continue;
}
(None, _, Some(fd), true, _) => {
crate::io::poll_wake(fd, None);
crate::io::drain_wake_pipe(fd);
continue;
}
// Live actors, queue empty, no IO/timers: they're parked
// waiting for each other (potential deadlock in user code),
// or another thread is about to add work. Sleep briefly to
// avoid hammering the shared mutex.
_ => {
thread::sleep(std::time::Duration::from_micros(100));
continue;
}
}
}
};
// ----------------------------------------------------------------
// 3. Resume the actor.
// ----------------------------------------------------------------
let sp = match inner.with_shared(|s| {
s.slot(pid).and_then(|slot| slot.actor.as_ref().map(|a| a.sp))
}) {
Some(sp) => sp,
None => continue, // stale pid
};
// First resume: move the closure into the trampoline's thread-local.
if let Some(b) = inner.with_shared(|s| s.pop_pending_closure(pid)) {
set_current_actor_box(b);
}
// Update per-thread stats: record who's on-CPU.
stats.current_pid_index.store(pid.index(), Ordering::Relaxed);
set_actor_sp(sp);
set_current_pid(pid);
reset_actor_done();
YIELD_INTENT.with(|c| c.set(YieldIntent::Yield));
crate::preempt::reset_timeslice();
PREEMPTION_ENABLED.with(|c| c.set(true));
unsafe { switch_to_actor() };
PREEMPTION_ENABLED.with(|c| c.set(false));
stats.current_pid_index.store(u32::MAX, Ordering::Relaxed);
clear_current_pid();
let intent = YIELD_INTENT.with(|c| c.get());
let new_sp = get_actor_sp();
if is_actor_done() {
let outcome = take_last_outcome().unwrap_or(Outcome::Exit);
finalize_actor(inner, pid, outcome);
} else {
inner.with_shared(|s| {
if let Some(slot) = s.slot_mut(pid) {
if let Some(actor) = slot.actor.as_mut() {
actor.sp = new_sp;
}
match intent {
YieldIntent::Yield => {
slot.state = State::Runnable;
s.run_queue.push_back(pid);
}
YieldIntent::Park => {
slot.state = State::Parked;
}
}
}
});
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -28,7 +28,7 @@
use crate::pid::Pid;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};
/// What to do when a timer entry's deadline arrives.
@@ -45,7 +45,7 @@ pub enum Reason {
/// target tell apart "this wait" from "a later wait by the same actor
/// on the same target".
WaitTimeout {
target: Rc<dyn TimerTarget>,
target: Arc<dyn TimerTarget>,
wait_seq: u64,
},
}