//! Multi-scheduler runtime: configuration, initialisation, and the shared //! state that all scheduler OS threads operate against. //! //! # Architecture //! //! ```text //! init(Config) → Runtime (Arc) //! //! RuntimeInner { //! shared: Mutex ← slot table, run queue, timers, IO //! stats: Vec ← one per thread, lockless atomics (RFC 000) //! io_parked: AtomicU32 ← actors parked on IO //! sleeping: AtomicU32 ← actors parked on timer //! } //! ``` //! //! `Runtime::run(f)` spawns N OS threads (one per `Config::resolved_thread_count()`), //! each running `schedule_loop`. It blocks until all scheduler threads exit, //! i.e. until the run queue is empty and nothing is pending. //! //! Each scheduler thread holds an `Arc` clone. Per-thread //! identity is a small integer index, stored in a thread-local, used to index //! into `stats`. //! //! # Timer / IO drain (try-lock, one-winner) //! //! On each loop iteration every scheduler thread tries `try_lock()` on a //! separate `drain_lock: Mutex<()>`. The winner drains due timers and IO //! completions; losers skip and move straight to popping an actor from the //! run queue. This is the simplest correct approach; revisit if the drain //! becomes a measured bottleneck. use crate::actor::{ clear_current_pid, is_actor_done, reset_actor_done, set_current_actor_box, set_current_pid, take_last_outcome, Actor, Outcome, }; use crate::channel::Sender; use crate::context::{get_actor_sp, set_actor_sp, switch_to_actor}; use crate::io::IoThread; use crate::pid::Pid; use crate::preempt::PREEMPTION_ENABLED; use crate::supervisor::Signal; use crate::timer::Timers; use std::collections::VecDeque; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; // --------------------------------------------------------------------------- // Config // --------------------------------------------------------------------------- /// Runtime configuration. /// /// ``` /// use smarm::runtime::Config; /// /// // Use all available CPUs (default): /// let c = Config::default(); /// /// // Exactly 4 scheduler threads: /// let c = Config::exact(4); /// /// // Between 2 and 8, clamped to available parallelism: /// let c = Config::new(2, 8, None); /// ``` #[derive(Clone, Debug)] pub struct Config { min: usize, max: usize, exact: Option, alloc_interval: u32, timeslice_cycles: u64, } impl Config { /// Exact thread count; takes precedence over min/max. pub fn exact(n: usize) -> Self { assert!(n >= 1, "scheduler thread count must be ≥ 1"); Self { min: n, max: n, exact: Some(n), alloc_interval: crate::preempt::DEFAULT_ALLOC_INTERVAL, timeslice_cycles: crate::preempt::DEFAULT_TIMESLICE_CYCLES, } } /// Bounded range. Thread count = clamp(available_parallelism, min, max). pub fn new(min: usize, max: usize, exact: Option) -> Self { assert!(min >= 1, "min must be ≥ 1"); assert!(max >= min, "max must be ≥ min"); if let Some(e) = exact { assert!(e >= 1, "exact must be ≥ 1"); } Self { min, max, exact, alloc_interval: crate::preempt::DEFAULT_ALLOC_INTERVAL, timeslice_cycles: crate::preempt::DEFAULT_TIMESLICE_CYCLES, } } /// How many allocations (or `smarm::check!()` calls) between RDTSC checks. /// Lower = more responsive preemption, higher = less overhead. /// Default: 128. pub fn alloc_interval(mut self, n: u32) -> Self { assert!(n >= 1, "alloc_interval must be ≥ 1"); self.alloc_interval = n; self } /// How many TSC cycles constitute one timeslice. /// Default: 300_000 (≈ 100µs on a 3 GHz CPU). pub fn timeslice_cycles(mut self, n: u64) -> Self { assert!(n >= 1, "timeslice_cycles must be ≥ 1"); self.timeslice_cycles = n; self } /// The number of scheduler threads this config resolves to. pub fn resolved_thread_count(&self) -> usize { if let Some(e) = self.exact { return e; } let avail = thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); avail.clamp(self.min, self.max) } } impl Default for Config { fn default() -> Self { let avail = thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); Self { min: 1, max: avail, exact: None, alloc_interval: crate::preempt::DEFAULT_ALLOC_INTERVAL, timeslice_cycles: crate::preempt::DEFAULT_TIMESLICE_CYCLES, } } } // --------------------------------------------------------------------------- // Per-thread stats (RFC 000 Layer 1 primitives) // --------------------------------------------------------------------------- /// Lockless per-scheduler-thread counters. Written only by the owning thread; /// readable from any thread (introspection actor, tests). pub struct SchedulerStats { /// PID index of the actor currently on-CPU, or `u32::MAX` when idle. pub current_pid_index: AtomicU32, /// Snapshot of run queue length maintained on every push/pop. pub run_queue_len: AtomicU64, } impl SchedulerStats { fn new() -> Self { Self { current_pid_index: AtomicU32::new(u32::MAX), run_queue_len: AtomicU64::new(0), } } } // --------------------------------------------------------------------------- // Runtime stats snapshot (for tests / introspection) // --------------------------------------------------------------------------- pub struct RuntimeStats { pub(crate) inner: Arc, } impl RuntimeStats { /// Sum of run queue lengths across all scheduler threads. pub fn total_run_queue_len(&self) -> u64 { self.inner.stats.iter() .map(|s| s.run_queue_len.load(Ordering::Relaxed)) .sum() } /// Number of scheduler threads. pub fn scheduler_count(&self) -> usize { self.inner.stats.len() } /// Actors currently parked on IO. pub fn io_parked_count(&self) -> u32 { self.inner.io_parked.load(Ordering::Relaxed) } /// Actors currently sleeping on a timer. pub fn sleeping_count(&self) -> u32 { self.inner.sleeping.load(Ordering::Relaxed) } } // --------------------------------------------------------------------------- // Shared state (behind Mutex<>) // --------------------------------------------------------------------------- pub(crate) const ACTOR_STACK_SIZE: usize = 64 * 1024; #[derive(Debug)] pub(crate) enum State { Runnable, Parked, Done } pub(crate) struct Slot { pub(crate) generation: u32, pub(crate) actor: Option, pub(crate) state: State, pub(crate) waiters: Vec, pub(crate) outcome: Option, pub(crate) supervisor_channel: Option>, pub(crate) outstanding_handles: u32, pub(crate) pending_io_result: Option, /// Set by `unpark()` when the actor is still running (not yet Parked). /// The scheduler checks this after a Park yield and re-queues instead /// of sleeping, closing the lost-wakeup window. pub(crate) pending_unpark: bool, } impl Slot { fn vacant() -> Self { Self { generation: 0, actor: None, state: State::Done, waiters: Vec::new(), outcome: None, supervisor_channel: None, outstanding_handles: 0, pending_io_result: None, pending_unpark: false, } } } pub(crate) type Closure = Box; pub(crate) struct SharedState { pub(crate) slots: Vec, pub(crate) free_list: Vec, pub(crate) run_queue: VecDeque, pub(crate) root_pid: Option, pub(crate) timers: Timers, pub(crate) io: Option, /// Closures awaiting their first resume, keyed by Pid. pub(crate) pending_closures: Vec<(Pid, Closure)>, } impl SharedState { fn new() -> Self { Self { slots: Vec::new(), free_list: Vec::new(), run_queue: VecDeque::new(), root_pid: None, timers: Timers::new(), io: None, pending_closures: Vec::new(), } } pub(crate) fn allocate_slot(&mut self) -> (u32, u32) { if let Some(idx) = self.free_list.pop() { let gen = self.slots[idx as usize].generation; (idx, gen) } else { let idx = self.slots.len() as u32; self.slots.push(Slot::vacant()); (idx, 0) } } pub(crate) fn slot(&self, pid: Pid) -> Option<&Slot> { let s = self.slots.get(pid.index() as usize)?; if s.generation == pid.generation() { Some(s) } else { None } } pub(crate) fn slot_mut(&mut self, pid: Pid) -> Option<&mut Slot> { let s = self.slots.get_mut(pid.index() as usize)?; if s.generation == pid.generation() { Some(s) } else { None } } pub(crate) fn pop_pending_closure(&mut self, pid: Pid) -> Option { let pos = self.pending_closures.iter().position(|(p, _)| *p == pid)?; Some(self.pending_closures.swap_remove(pos).1) } } // --------------------------------------------------------------------------- // RuntimeInner — the shared core behind an Arc // --------------------------------------------------------------------------- pub(crate) struct RuntimeInner { pub(crate) shared: Mutex, /// Try-lock: exactly one scheduler thread drains timers/IO per iteration. drain_lock: Mutex<()>, /// Per-thread stats, indexed by scheduler thread slot (0..N). pub(crate) stats: Vec, /// Global counters for RFC 000 primitives. pub(crate) io_parked: AtomicU32, pub(crate) sleeping: AtomicU32, /// Preemption knobs, written into each scheduler thread's locals on startup. pub(crate) alloc_interval: u32, pub(crate) timeslice_cycles: u64, } impl RuntimeInner { fn new(thread_count: usize, alloc_interval: u32, timeslice_cycles: u64) -> Arc { let stats = (0..thread_count).map(|_| SchedulerStats::new()).collect(); Arc::new(Self { shared: Mutex::new(SharedState::new()), drain_lock: Mutex::new(()), stats, io_parked: AtomicU32::new(0), sleeping: AtomicU32::new(0), alloc_interval, timeslice_cycles, }) } pub(crate) fn with_shared(&self, f: impl FnOnce(&mut SharedState) -> R) -> R { // Preemption must be off while we hold the shared mutex. If an actor // called with_shared (e.g. from spawn, join, sleep) and the allocator // fired maybe_preempt() while the lock was held, switch_to_scheduler() // would context-switch to the scheduler loop, which would immediately // deadlock trying to acquire the same mutex. let prev = crate::preempt::PREEMPTION_ENABLED.with(|c| c.replace(false)); let result = f(&mut self.shared.lock().unwrap()); crate::preempt::PREEMPTION_ENABLED.with(|c| c.set(prev)); result } } // --------------------------------------------------------------------------- // Runtime — the public handle // --------------------------------------------------------------------------- pub struct Runtime { inner: Arc, thread_count: usize, } /// Initialise the runtime with the given config. Returns a reusable handle. pub fn init(config: Config) -> Runtime { let n = config.resolved_thread_count(); Runtime { inner: RuntimeInner::new(n, config.alloc_interval, config.timeslice_cycles), thread_count: n, } } impl Runtime { /// Run `f` as the initial actor, block until all actors finish. /// Can be called multiple times sequentially on the same `Runtime`. pub fn run(&self, f: impl FnOnce() + Send + 'static) { // Install smarm's panic hook on first call. The default Rust hook is // not reentrant — concurrent actor panics can trigger a double-panic // abort when the backtrace printer takes an internal lock that is // already held. smarm catches every actor panic via `catch_unwind` in // the trampoline, so panics never need to reach the hook for runtime // correctness; the hook fires only as a side-effect of unwinding before // `catch_unwind` catches it. // // We install once and leave it installed: the previous hook is chained // so that panics outside actor context (e.g. in the test harness // itself) are still reported normally. static HOOK_INSTALLED: std::sync::OnceLock<()> = std::sync::OnceLock::new(); HOOK_INSTALLED.get_or_init(|| { let prev = std::panic::take_hook(); std::panic::set_hook(Box::new(move |info| { // If we are currently executing inside an actor trampoline the // panic will be caught by `catch_unwind` momentarily. Suppress // the hook output to avoid interleaved noise and reentrancy. // Outside actor context, delegate to the previous hook so that // genuine runtime panics are still reported. if crate::actor::current_pid().is_some() { // Inside an actor — catch_unwind handles it; stay silent. } else { prev(info); } })); }); // Open the trace store for this run (no-op without smarm-trace). #[cfg(feature = "smarm-trace")] crate::trace::open(); // Re-initialise shared state for this run. { let mut s = self.inner.shared.lock().unwrap(); assert!(s.run_queue.is_empty(), "run() called while previous run still active"); s.root_pid = Some(ROOT_PID); s.io = Some(IoThread::start().expect("failed to start IO thread")); } // Spawn the initial actor through the public spawn path (which // requires a running runtime in the thread-local). RUNTIME.with(|r| *r.borrow_mut() = Some(self.inner.clone())); let initial_handle = crate::scheduler::spawn(f); // Launch N-1 extra scheduler threads. The calling thread is thread 0. let mut os_threads = Vec::new(); for slot in 1..self.thread_count { let inner = self.inner.clone(); let t = thread::spawn(move || { RUNTIME.with(|r| *r.borrow_mut() = Some(inner.clone())); SCHED_SLOT.with(|s| s.set(slot)); schedule_loop(&inner, slot); RUNTIME.with(|r| *r.borrow_mut() = None); }); os_threads.push(t); } // Thread 0 runs the loop on the calling thread. SCHED_SLOT.with(|s| s.set(0)); schedule_loop(&self.inner, 0); // Wait for all other scheduler threads. for t in os_threads { let _ = t.join(); } // Drop initial handle (decrements outstanding_handles count). drop(initial_handle); // Tear down IO and clean up shared state for the next run() call. let mut s = self.inner.shared.lock().unwrap(); drop(s.io.take()); // joins IO threads s.pending_closures.clear(); // Reset per-thread stats. for stat in &self.inner.stats { stat.current_pid_index.store(u32::MAX, Ordering::Relaxed); stat.run_queue_len.store(0, Ordering::Relaxed); } self.inner.io_parked.store(0, Ordering::Relaxed); self.inner.sleeping.store(0, Ordering::Relaxed); RUNTIME.with(|r| *r.borrow_mut() = None); // Flush trace to disk (no-op without smarm-trace). #[cfg(feature = "smarm-trace")] crate::trace::flush(); } /// Snapshot of runtime statistics for introspection / tests. pub fn stats(&self) -> RuntimeStats { RuntimeStats { inner: self.inner.clone() } } } // --------------------------------------------------------------------------- // Thread-locals // --------------------------------------------------------------------------- use std::cell::{Cell, RefCell}; thread_local! { /// The RuntimeInner for the current run(). Set by run() on the calling /// thread and by each spawned scheduler thread. pub(crate) static RUNTIME: RefCell>> = const { RefCell::new(None) }; /// This scheduler thread's index into RuntimeInner::stats. static SCHED_SLOT: Cell = const { Cell::new(0) }; /// What the actor wants when it yields back to the scheduler. static YIELD_INTENT: Cell = const { Cell::new(YieldIntent::Yield) }; } #[derive(Copy, Clone)] pub(crate) enum YieldIntent { Yield, Park } pub(crate) fn set_yield_intent(i: YieldIntent) { YIELD_INTENT.with(|c| c.set(i)); } // --------------------------------------------------------------------------- // Sentinel root PID // --------------------------------------------------------------------------- pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX); // --------------------------------------------------------------------------- // Slot reclamation // --------------------------------------------------------------------------- pub(crate) fn reclaim_slot(s: &mut SharedState, pid: Pid) { let idx = pid.index(); let slot = &mut s.slots[idx as usize]; slot.generation = slot.generation.wrapping_add(1); slot.actor = None; slot.outcome = None; slot.waiters.clear(); slot.supervisor_channel = None; slot.state = State::Done; slot.outstanding_handles = 0; slot.pending_unpark = false; slot.pending_io_result = None; s.free_list.push(idx); } // --------------------------------------------------------------------------- // finalize_actor // --------------------------------------------------------------------------- fn finalize_actor(inner: &Arc, pid: Pid, outcome: Outcome) { let (joiner_outcome, sup_signal) = match outcome { Outcome::Exit => (Outcome::Exit, Signal::Exit(pid)), Outcome::Panic(payload) => ( Outcome::Panic(payload), Signal::Panic(pid, Box::new(()) as Box), ), }; let (waiters, supervisor_pid) = inner.with_shared(|s| { let slot = s.slot_mut(pid).expect("finalize_actor: slot vanished"); let sup = slot.actor.as_ref().map(|a| a.supervisor); slot.outcome = Some(joiner_outcome); slot.state = State::Done; slot.actor = None; (std::mem::take(&mut slot.waiters), sup) }); // Deliver to supervisor. if let Some(sup) = supervisor_pid { let sender = inner.with_shared(|s| { s.slot(sup).and_then(|slot| slot.supervisor_channel.clone()) }); if let Some(sender) = sender { let _ = sender.send(sup_signal); } } // Unpark joiners. for joiner in waiters { crate::scheduler::unpark(joiner); } // Reclaim if no outstanding handles. inner.with_shared(|s| { let reclaim = s.slot(pid).map(|slot| slot.outstanding_handles == 0).unwrap_or(false); if reclaim { reclaim_slot(s, pid); } }); } // --------------------------------------------------------------------------- // schedule_loop — runs on each scheduler OS thread // --------------------------------------------------------------------------- fn schedule_loop(inner: &Arc, slot: usize) { crate::preempt::configure_preempt(inner.alloc_interval, inner.timeslice_cycles); let stats = &inner.stats[slot]; loop { // ---------------------------------------------------------------- // 1. Try to win the drain lock (timers + IO). One winner per round; // losers skip immediately and proceed to step 2. // ---------------------------------------------------------------- if let Ok(_drain_guard) = inner.drain_lock.try_lock() { let now = std::time::Instant::now(); // Drain due timers. let due = inner.with_shared(|s| s.timers.pop_due(now)); for entry in due { match entry.reason { crate::timer::Reason::Sleep => { inner.with_shared(|s| { if let Some(slot) = s.slot_mut(entry.pid) { if matches!(slot.state, State::Parked) { slot.state = State::Runnable; s.run_queue.push_back(entry.pid); crate::te!(crate::trace::Event::Enqueue(entry.pid)); } } }); } crate::timer::Reason::WaitTimeout { target, wait_seq } => { // Runs outside with_shared — the callback may call unpark. target.on_timeout(entry.pid, wait_seq); } } } // Drain IO completions. let completions = inner.with_shared(|s| { s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default() }); for completion in completions { match completion { crate::io::Completion::Blocking { pid, result } => { inner.with_shared(|s| { if let Some(io) = s.io.as_mut() { io.outstanding = io.outstanding.saturating_sub(1); } if let Some(slot) = s.slot_mut(pid) { slot.pending_io_result = Some(result); if matches!(slot.state, State::Parked) { slot.state = State::Runnable; s.run_queue.push_back(pid); crate::te!(crate::trace::Event::Enqueue(pid)); } } }); } crate::io::Completion::FdReady { fd, events: _ } => { inner.with_shared(|s| { let parked_pid = s.io.as_mut().and_then(|io| { let pid = io.waiters.remove(&fd); io.epoll_deregister(fd); pid }); if let Some(pid) = parked_pid { if let Some(slot) = s.slot_mut(pid) { match slot.state { State::Parked => { slot.state = State::Runnable; s.run_queue.push_back(pid); crate::te!(crate::trace::Event::UnparkDirect(pid)); crate::te!(crate::trace::Event::Enqueue(pid)); } // Actor is between epoll_register // and park_current. Set the flag so // the upcoming Park yield re-queues // instead of suspending. Mirrors // scheduler::unpark(). State::Runnable => { slot.pending_unpark = true; crate::te!(crate::trace::Event::UnparkDeferred(pid)); } State::Done => {} } } } }); } } } } // drain_guard drops here // ---------------------------------------------------------------- // 2. Pop a runnable actor from the shared queue. // ---------------------------------------------------------------- let pid = match inner.with_shared(|s| { let len = s.run_queue.len() as u64; stats.run_queue_len.store(len, Ordering::Relaxed); s.run_queue.pop_front() }) { Some(p) => { crate::te!(crate::trace::Event::Dequeue(p)); p } None => { // Queue was empty when we popped. Re-examine under the lock to // decide whether to exit or wait. All four conditions must hold // simultaneously before we exit: // 1. run queue is still empty // 2. no live actors (nothing parked, nothing mid-finalize) // 3. no pending timers // 4. no outstanding IO // If any is non-zero we keep spinning — "check the fridge is // empty before you leave for the airport". let (next_deadline, io_outstanding, wake_fd, all_clear) = inner.with_shared(|s| { let next = s.timers.peek_deadline(); let (out, fd) = match s.io.as_ref() { Some(io) => ( io.outstanding + io.waiters.len() as u32, Some(io.wake_fd()), ), None => (0, None), }; let live = s.slots.iter().filter(|slot| slot.actor.is_some()).count(); let queue_empty = s.run_queue.is_empty(); let all_clear = queue_empty && live == 0 && next.is_none() && out == 0; (next, out, fd, all_clear) }); if all_clear { return; } // Something is still in flight. Sleep on the appropriate source // to avoid hammering the mutex; the loop will retry on wake. match (next_deadline, wake_fd) { (Some(deadline), fd_opt) => { let now = std::time::Instant::now(); if deadline > now { let timeout = deadline - now; match fd_opt { Some(fd) => { crate::io::poll_wake(fd, Some(timeout)); crate::io::drain_wake_pipe(fd); } None => thread::sleep(timeout), } } } (None, Some(fd)) if io_outstanding > 0 => { crate::io::poll_wake(fd, None); crate::io::drain_wake_pipe(fd); } _ => { thread::sleep(std::time::Duration::from_micros(100)); } } continue; } }; // ---------------------------------------------------------------- // 3. Resume the actor. // ---------------------------------------------------------------- let sp = match inner.with_shared(|s| { s.slot(pid).and_then(|slot| slot.actor.as_ref().map(|a| a.sp)) }) { Some(sp) => sp, None => { continue; // stale pid } }; // First resume: move the closure into the trampoline's thread-local. if let Some(b) = inner.with_shared(|s| s.pop_pending_closure(pid)) { set_current_actor_box(b); } // Update per-thread stats: record who's on-CPU. stats.current_pid_index.store(pid.index(), Ordering::Relaxed); set_actor_sp(sp); set_current_pid(pid); reset_actor_done(); YIELD_INTENT.with(|c| c.set(YieldIntent::Yield)); crate::preempt::reset_timeslice(); PREEMPTION_ENABLED.with(|c| c.set(true)); crate::te!(crate::trace::Event::Resume(pid)); unsafe { switch_to_actor() }; PREEMPTION_ENABLED.with(|c| c.set(false)); stats.current_pid_index.store(u32::MAX, Ordering::Relaxed); clear_current_pid(); let intent = YIELD_INTENT.with(|c| c.get()); let new_sp = get_actor_sp(); if is_actor_done() { crate::te!(crate::trace::Event::Done(pid)); let outcome = take_last_outcome().unwrap_or(Outcome::Exit); finalize_actor(inner, pid, outcome); } else { inner.with_shared(|s| { if let Some(slot) = s.slot_mut(pid) { if let Some(actor) = slot.actor.as_mut() { actor.sp = new_sp; } match intent { YieldIntent::Yield => { crate::te!(crate::trace::Event::Yield(pid)); slot.state = State::Runnable; s.run_queue.push_back(pid); crate::te!(crate::trace::Event::Enqueue(pid)); } YieldIntent::Park => { // Check if unpark() fired while the actor was // still running (between registering in the // channel and calling park_current). If so, // re-queue immediately instead of parking. if slot.pending_unpark { slot.pending_unpark = false; slot.state = State::Runnable; s.run_queue.push_back(pid); crate::te!(crate::trace::Event::UnparkFlagConsumed(pid)); crate::te!(crate::trace::Event::Enqueue(pid)); } else { crate::te!(crate::trace::Event::Park(pid)); slot.state = State::Parked; } } } } }); } } }