diff --git a/Cargo.toml b/Cargo.toml index 50f0ef1..1cf0a53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ libc = "0.2" [dev-dependencies] libc = "0.2" -tokio = { version = "1", features = ["rt", "macros", "sync"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "sync"] } [profile.dev] panic = "unwind" @@ -22,3 +22,7 @@ codegen-units = 1 [[bench]] name = "primes" harness = false + +[[bench]] +name = "multi_scheduler" +harness = false diff --git a/benches/multi_scheduler.rs b/benches/multi_scheduler.rs new file mode 100644 index 0000000..5771e73 --- /dev/null +++ b/benches/multi_scheduler.rs @@ -0,0 +1,343 @@ +//! Benchmarks for the multi-scheduler runtime. +//! +//! Three workloads, three runtimes: +//! - smarm single-thread (exact = 1) +//! - smarm multi-thread (exact = available_parallelism) +//! - tokio current_thread (single-thread baseline) +//! - tokio multi-thread (the parallel comparison) +//! +//! Workloads: +//! 1. Fan-out / fan-in compute (primes) — CPU-bound, tests parallelism +//! 2. Ping-pong — message-passing overhead, park/unpark cost +//! 3. Spawn throughput — cost of spawn + join per actor + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +// --------------------------------------------------------------------------- +// Shared helpers +// --------------------------------------------------------------------------- + +fn available_threads() -> usize { + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) +} + +fn print_header(title: &str) { + println!("\n{}", "=".repeat(80)); + println!(" {title}"); + println!("{}", "=".repeat(80)); + println!( + "{:>22} | {:>12} | {:>10} | {:>10} | {:>10}", + "runtime", "result", "median µs", "min µs", "max µs" + ); + println!("{}", "-".repeat(80)); +} + +fn run_n (u64, u128)>(name: &str, n: u32, mut f: F) { + let mut times = Vec::new(); + let mut last = 0u64; + for _ in 0..n { + let (v, t) = f(); + times.push(t); + last = v; + } + times.sort_unstable(); + let median = times[times.len() / 2]; + let min = *times.iter().min().unwrap(); + let max = *times.iter().max().unwrap(); + println!( + "{:>22} | {:>12} | {:>10} | {:>10} | {:>10}", + name, last, median, min, max + ); +} + +const ITERS: u32 = 7; + +// --------------------------------------------------------------------------- +// Workload 1: fan-out / fan-in primes +// --------------------------------------------------------------------------- + +const PRIME_N: u64 = 400_000; +const WORKERS: u64 = 64; + +fn is_prime(n: u64) -> bool { + if n < 2 { return false; } + if n < 4 { return true; } + if n % 2 == 0 { return false; } + let mut i = 3u64; + while i * i <= n { if n % i == 0 { return false; } i += 2; } + true +} + +fn count_primes(lo: u64, hi: u64) -> u64 { + (lo..hi).filter(|&n| is_prime(n)).count() as u64 +} + +fn primes_slice(w: u64) -> (u64, u64) { + let per = PRIME_N / WORKERS; + let lo = w * per; + let hi = if w + 1 == WORKERS { PRIME_N } else { lo + per }; + (lo, hi) +} + +fn bench_primes_smarm(threads: usize) -> (u64, u128) { + let total = Arc::new(AtomicU64::new(0)); + let t2 = total.clone(); + let start = Instant::now(); + smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(move || { + let mut handles = Vec::new(); + for w in 0..WORKERS { + let (lo, hi) = primes_slice(w); + let tc = t2.clone(); + handles.push(smarm::spawn(move || { + tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed); + })); + } + for h in handles { h.join().unwrap(); } + }); + (total.load(Ordering::Relaxed), start.elapsed().as_micros()) +} + +fn bench_primes_tokio_current() -> (u64, u128) { + let total = Arc::new(AtomicU64::new(0)); + let t2 = total.clone(); + let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); + let start = Instant::now(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, async move { + let mut handles = Vec::new(); + for w in 0..WORKERS { + let (lo, hi) = primes_slice(w); + let tc = t2.clone(); + handles.push(tokio::task::spawn_local(async move { + tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed); + })); + } + for h in handles { let _ = h.await; } + }); + (total.load(Ordering::Relaxed), start.elapsed().as_micros()) +} + +fn bench_primes_tokio_multi() -> (u64, u128) { + let total = Arc::new(AtomicU64::new(0)); + let t2 = total.clone(); + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(available_threads()) + .build() + .unwrap(); + let start = Instant::now(); + rt.block_on(async move { + let mut handles = Vec::new(); + for w in 0..WORKERS { + let (lo, hi) = primes_slice(w); + let tc = t2.clone(); + handles.push(tokio::spawn(async move { + tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed); + })); + } + for h in handles { let _ = h.await; } + }); + (total.load(Ordering::Relaxed), start.elapsed().as_micros()) +} + +fn bench_primes_baseline() -> (u64, u128) { + let start = Instant::now(); + let total: u64 = (0..WORKERS).map(|w| { + let (lo, hi) = primes_slice(w); + count_primes(lo, hi) + }).sum(); + (total, start.elapsed().as_micros()) +} + +// --------------------------------------------------------------------------- +// Workload 2: channel ping-pong +// --------------------------------------------------------------------------- + +const PING_ROUNDS: u64 = 10_000; + +fn bench_pingpong_smarm(threads: usize) -> (u64, u128) { + let start = Instant::now(); + smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(|| { + let (tx_a, rx_a) = smarm::channel::(); + let (tx_b, rx_b) = smarm::channel::(); + let ha = smarm::spawn(move || { + tx_a.send(0).unwrap(); + loop { + let v = rx_b.recv().unwrap(); + if v >= PING_ROUNDS { break; } + tx_a.send(v + 1).unwrap(); + } + }); + let hb = smarm::spawn(move || { + loop { + let v = rx_a.recv().unwrap(); + tx_b.send(v + 1).unwrap(); + if v + 1 >= PING_ROUNDS { break; } + } + }); + ha.join().unwrap(); + hb.join().unwrap(); + }); + (PING_ROUNDS, start.elapsed().as_micros()) +} + +fn bench_pingpong_tokio_current() -> (u64, u128) { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let start = Instant::now(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, async move { + let (tx_a, mut rx_a) = tokio::sync::mpsc::unbounded_channel::(); + let (tx_b, mut rx_b) = tokio::sync::mpsc::unbounded_channel::(); + let ha = tokio::task::spawn_local(async move { + tx_a.send(0).unwrap(); + loop { + let v = rx_b.recv().await.unwrap(); + if v >= PING_ROUNDS { break; } + tx_a.send(v + 1).unwrap(); + } + }); + let hb = tokio::task::spawn_local(async move { + loop { + let v = rx_a.recv().await.unwrap(); + tx_b.send(v + 1).unwrap(); + if v + 1 >= PING_ROUNDS { break; } + } + }); + let _ = ha.await; + let _ = hb.await; + }); + (PING_ROUNDS, start.elapsed().as_micros()) +} + +fn bench_pingpong_tokio_multi() -> (u64, u128) { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) // ping-pong only needs 2 threads + .enable_all() + .build() + .unwrap(); + let start = Instant::now(); + rt.block_on(async move { + let (tx_a, mut rx_a) = tokio::sync::mpsc::unbounded_channel::(); + let (tx_b, mut rx_b) = tokio::sync::mpsc::unbounded_channel::(); + let ha = tokio::spawn(async move { + tx_a.send(0).unwrap(); + loop { + let v = rx_b.recv().await.unwrap(); + if v >= PING_ROUNDS { break; } + tx_a.send(v + 1).unwrap(); + } + }); + let hb = tokio::spawn(async move { + loop { + let v = rx_a.recv().await.unwrap(); + tx_b.send(v + 1).unwrap(); + if v + 1 >= PING_ROUNDS { break; } + } + }); + let _ = ha.await; + let _ = hb.await; + }); + (PING_ROUNDS, start.elapsed().as_micros()) +} + +// --------------------------------------------------------------------------- +// Workload 3: spawn throughput +// --------------------------------------------------------------------------- + +const SPAWN_COUNT: u64 = 1_000; + +fn bench_spawn_smarm(threads: usize) -> (u64, u128) { + let counter = Arc::new(AtomicU64::new(0)); + let c = counter.clone(); + let start = Instant::now(); + smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(move || { + let mut handles = Vec::new(); + for _ in 0..SPAWN_COUNT { + let cc = c.clone(); + handles.push(smarm::spawn(move || { + cc.fetch_add(1, Ordering::Relaxed); + })); + } + for h in handles { h.join().unwrap(); } + }); + (counter.load(Ordering::Relaxed), start.elapsed().as_micros()) +} + +fn bench_spawn_tokio_current() -> (u64, u128) { + let counter = Arc::new(AtomicU64::new(0)); + let c = counter.clone(); + let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); + let start = Instant::now(); + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, async move { + let mut handles = Vec::new(); + for _ in 0..SPAWN_COUNT { + let cc = c.clone(); + handles.push(tokio::task::spawn_local(async move { + cc.fetch_add(1, Ordering::Relaxed); + })); + } + for h in handles { let _ = h.await; } + }); + (counter.load(Ordering::Relaxed), start.elapsed().as_micros()) +} + +fn bench_spawn_tokio_multi() -> (u64, u128) { + let counter = Arc::new(AtomicU64::new(0)); + let c = counter.clone(); + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(available_threads()) + .build() + .unwrap(); + let start = Instant::now(); + rt.block_on(async move { + let mut handles = Vec::new(); + for _ in 0..SPAWN_COUNT { + let cc = c.clone(); + handles.push(tokio::spawn(async move { + cc.fetch_add(1, Ordering::Relaxed); + })); + } + for h in handles { let _ = h.await; } + }); + (counter.load(Ordering::Relaxed), start.elapsed().as_micros()) +} + +// --------------------------------------------------------------------------- +// main +// --------------------------------------------------------------------------- + +fn main() { + let n = available_threads(); + println!("smarm multi-scheduler benchmarks"); + println!("available parallelism: {n} threads"); + println!("PRIME_N={PRIME_N}, WORKERS={WORKERS}, PING_ROUNDS={PING_ROUNDS}, SPAWN_COUNT={SPAWN_COUNT}"); + + // ---- Primes ---- + print_header(&format!("Fan-out/fan-in: count primes in [2, {PRIME_N}) across {WORKERS} workers")); + run_n("baseline (serial)", ITERS, bench_primes_baseline); + run_n("smarm single-thread", ITERS, || bench_primes_smarm(1)); + run_n(&format!("smarm {n}-thread"), ITERS, || bench_primes_smarm(n)); + run_n("tokio current_thread", ITERS, bench_primes_tokio_current); + run_n("tokio multi-thread", ITERS, bench_primes_tokio_multi); + + // ---- Ping-pong ---- + print_header(&format!("Ping-pong: {PING_ROUNDS} round-trips between two actors")); + run_n("smarm single-thread", ITERS, || bench_pingpong_smarm(1)); + run_n(&format!("smarm {n}-thread"), ITERS, || bench_pingpong_smarm(n)); + run_n("tokio current_thread", ITERS, bench_pingpong_tokio_current); + run_n("tokio multi-thread", ITERS, bench_pingpong_tokio_multi); + + // ---- Spawn throughput ---- + print_header(&format!("Spawn throughput: {SPAWN_COUNT} actors spawned and joined")); + run_n("smarm single-thread", ITERS, || bench_spawn_smarm(1)); + run_n(&format!("smarm {n}-thread"), ITERS, || bench_spawn_smarm(n)); + run_n("tokio current_thread", ITERS, bench_spawn_tokio_current); + run_n("tokio multi-thread", ITERS, bench_spawn_tokio_multi); +} diff --git a/src/channel.rs b/src/channel.rs index e21186a..96fae5e 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,12 +1,8 @@ //! Unbounded MPSC channels. //! -//! Single-threaded scheduler: the inner state is `Rc>>`, -//! not `Arc`. We hand-implement `Send` for `Sender` and -//! `Receiver` when `T: Send`, on the basis that the only way two actor -//! contexts touch the same channel is by being scheduled on the *same* OS -//! thread (v0.1 has exactly one). When we add a second scheduler thread, -//! this lie must be retired: replace `Rc` with `Arc` (or a -//! lock-free queue) and remove the unsafe Send impls. +//! Inner state is `Arc>>` so channels can be sent across OS +//! threads (required for the multi-scheduler runtime where a sender and +//! receiver may run on different scheduler threads simultaneously). //! //! Semantics: //! - Senders are clonable; the last sender drop closes the channel. @@ -19,12 +15,11 @@ //! parked, the receiver is unparked. use crate::pid::Pid; -use std::cell::RefCell; use std::collections::VecDeque; -use std::rc::Rc; +use std::sync::{Arc, Mutex}; pub fn channel() -> (Sender, Receiver) { - let inner = Rc::new(RefCell::new(Inner { + let inner = Arc::new(Mutex::new(Inner { queue: VecDeque::new(), parked_receiver: None, senders: 1, @@ -41,20 +36,13 @@ struct Inner { } pub struct Sender { - inner: Rc>>, + inner: Arc>>, } pub struct Receiver { - inner: Rc>>, + inner: Arc>>, } -// SAFETY (v0.1 only): the scheduler is single-threaded. Sender/Receiver can -// be captured into actor closures (which require Send), but they will only -// ever be touched from one OS thread. When multi-threading lands, swap the -// `Rc` for `Arc` and remove these. -unsafe impl Send for Sender {} -unsafe impl Send for Receiver {} - #[derive(Debug, PartialEq, Eq)] pub struct SendError(pub T); @@ -71,7 +59,7 @@ impl std::error::Error for RecvError {} impl Clone for Sender { fn clone(&self) -> Self { - self.inner.borrow_mut().senders += 1; + self.inner.lock().unwrap().senders += 1; Sender { inner: self.inner.clone() } } } @@ -79,11 +67,9 @@ impl Clone for Sender { impl Drop for Sender { fn drop(&mut self) { let unpark = { - let mut g = self.inner.borrow_mut(); + let mut g = self.inner.lock().unwrap(); g.senders -= 1; if g.senders == 0 && g.queue.is_empty() { - // Channel closed and drained. Wake the receiver so it can - // see RecvError. g.parked_receiver.take() } else { None @@ -97,19 +83,18 @@ impl Drop for Sender { impl Drop for Receiver { fn drop(&mut self) { - self.inner.borrow_mut().receiver_alive = false; + self.inner.lock().unwrap().receiver_alive = false; } } impl Sender { pub fn send(&self, value: T) -> Result<(), SendError> { let unpark = { - let mut g = self.inner.borrow_mut(); + let mut g = self.inner.lock().unwrap(); if !g.receiver_alive { return Err(SendError(value)); } g.queue.push_back(value); - // If the receiver is parked, unpark it. g.parked_receiver.take() }; if let Some(pid) = unpark { @@ -122,16 +107,14 @@ impl Sender { impl Receiver { pub fn recv(&self) -> Result { loop { - // Try to take a message. { - let mut g = self.inner.borrow_mut(); + let mut g = self.inner.lock().unwrap(); if let Some(v) = g.queue.pop_front() { return Ok(v); } if g.senders == 0 { return Err(RecvError); } - // Empty + open: register and park. let me = crate::actor::current_pid() .expect("recv() called outside an actor"); debug_assert!( @@ -140,18 +123,15 @@ impl Receiver { ); g.parked_receiver = Some(me); } - // Release the borrow before parking — the unparker will need it. + // Release the lock before parking — the unparker will need it. crate::scheduler::park_current(); - // Loop: the message that woke us might already have been taken - // (it can't, with one receiver, but the senders=0 path can fire - // here too). } } /// Non-blocking. `Ok(Some(v))` if a message was available, `Ok(None)` if /// the channel is empty but open, `Err(RecvError)` if closed and drained. pub fn try_recv(&self) -> Result, RecvError> { - let mut g = self.inner.borrow_mut(); + let mut g = self.inner.lock().unwrap(); if let Some(v) = g.queue.pop_front() { return Ok(Some(v)); } diff --git a/src/lib.rs b/src/lib.rs index 3e6e348..ad80ae1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,14 +2,12 @@ //! //! Erlang-style green-thread actor concurrency for Rust. //! -//! Single-threaded for now: one scheduler, one OS thread. The scheduler -//! cooperatively interleaves green-thread actors with hand-rolled context -//! switches. Actors communicate by sending `Send` messages over channels; -//! every actor has a supervisor, which is itself just an actor with a -//! `Receiver`. Synchronisation primitives — `Mutex` with -//! mandatory lock timeouts, channel `recv`, `sleep`, and epoll-backed -//! `wait_readable`/`wait_writable` — all park the green thread, never -//! the OS thread. +//! Multi-threaded: N scheduler OS threads (default: one per CPU) share a +//! single global run queue behind a `Mutex`. Actors communicate by sending +//! `Send` messages over channels; every actor has a supervisor. Synchronisation +//! primitives — `Mutex` with mandatory lock timeouts, channel `recv`, +//! `sleep`, and epoll-backed `wait_readable`/`wait_writable` — all park the +//! green thread, never the OS thread. //! //! See `LOOM.md` for the design intent and the deferred-for-later list. @@ -24,13 +22,10 @@ pub mod supervisor; pub mod timer; pub mod io; pub mod mutex; +pub mod runtime; // --------------------------------------------------------------------------- // Global allocator -// -// The preempting allocator wraps `System`. While `PREEMPTION_ENABLED` is -// false (the default outside an actor) it adds one branch per allocation -// and no syscalls. The scheduler flips it on per-resume. // --------------------------------------------------------------------------- #[global_allocator] @@ -43,31 +38,19 @@ static ALLOCATOR: preempt::PreemptingAllocator = preempt::PreemptingAllocator; pub use channel::{channel, Receiver, RecvError, Sender}; pub use mutex::{LockTimeout, Mutex, MutexGuard}; pub use pid::Pid; +pub use runtime::{init, Config, Runtime}; pub use scheduler::{ block_on_io, run, self_pid, sleep, spawn, spawn_under, wait_readable, wait_writable, yield_now, JoinError, JoinHandle, }; -// `read` and `write` would shadow heavily-used names if re-exported at the -// crate root; users reach for them as `smarm::scheduler::read` / -// `smarm::scheduler::write` instead. May reshuffle into a `smarm::io` -// surface in a future pass. pub use supervisor::Signal; // --------------------------------------------------------------------------- -// check!() — explicit preemption point for tight no-alloc loops. +// check!() // --------------------------------------------------------------------------- /// Voluntarily check whether this actor's timeslice has expired, yielding -/// if so. Drop this into hot compute loops that don't allocate (heap or -/// large stack frames) — without it, such loops monopolise the scheduler -/// until they return. -/// -/// Decrements the same per-actor event counter as the heap allocator's -/// preemption hook, so the check rate is identical regardless of whether -/// the actor is alloc-heavy, check-heavy, or mixed. -/// -/// No-op outside an actor (the runtime's `PREEMPTION_ENABLED` flag is -/// false there). +/// if so. #[macro_export] macro_rules! check { () => { diff --git a/src/mutex.rs b/src/mutex.rs index d9ac739..3773969 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -1,63 +1,20 @@ //! Actor-aware mutex with mandatory timeout. //! -//! `loom::Mutex` looks like `std::sync::Mutex` but its `lock()` parks -//! the calling *green* thread on contention rather than blocking the OS -//! thread — and every lock attempt is bounded by a timeout. If the lock is -//! not acquired within the timeout, `lock()` returns `Err(LockTimeout)`. -//! This is a hard runtime guarantee (the spec calls it out): no actor can -//! be parked on a mutex forever. +//! `Mutex` parks the calling *green* thread on contention rather than +//! blocking the OS thread. Every lock attempt is bounded by a timeout. //! -//! ```ignore -//! let m = loom::Mutex::new(42); -//! let guard = m.lock()?; // default timeout -//! let guard = m.lock_timeout(Duration::from_millis(50))?; -//! ``` +//! Internals use `Arc>` so the type is genuinely +//! `Send + Sync` and can be shared across scheduler threads. //! -//! Fairness -//! ======== -//! Waiters are granted the lock in FIFO order. The spec prizes fairness: -//! starvation under contention is precisely the kind of failure mode -//! supervision can't recover from cleanly. LIFO would be faster on cache -//! locality and is not offered. -//! -//! Poisoning -//! ========= -//! Unlike `std::sync::Mutex`, `loom::Mutex` does not poison on panic. If a -//! holder panics while holding the lock, the next waiter receives the -//! (now-untouched) value. Rationale: supervision handles the panic at the -//! actor level; a separate poisoning channel is redundant and adds an -//! error case to every `lock()`. Users who care about "the value may be in -//! an inconsistent state after a panic" should encode that in `T` itself -//! (e.g. `Mutex>` and `take()` the value at the start of -//! each critical section). -//! -//! Reentrance -//! ========== -//! Not reentrant. An actor that already holds the lock and calls `lock()` -//! again on the same mutex will wait on its own grant — and time out. This -//! is a bug in the caller, not a feature. -//! -//! Multi-threading note -//! ==================== -//! The current implementation uses `Rc>` internals because the -//! v0.2 scheduler is single-threaded. The public API is identical to what -//! the eventual multi-threaded version will expose; the migration replaces -//! the `Rc` with `Arc` around bookkeeping (waiters -//! queue, holder pid) — the *value* itself never goes through a blocking -//! OS-level lock, because contention always parks the green thread first. -//! No `unsafe impl Send` games today: `loom::Mutex` is `!Send` on v0.2, -//! which is correct given there is only one OS thread. +//! Fairness: FIFO. Poisoning: none. Reentrance: deadlock (caller bug). use crate::pid::Pid; use crate::scheduler; use crate::timer::{self, TimerTarget}; -use std::cell::{Cell, RefCell}; use std::collections::VecDeque; -use std::rc::Rc; +use std::sync::{Arc, Mutex as StdMutex}; use std::time::Duration; -/// 30 seconds. Override per-call with `lock_timeout`, or per-mutex (TODO) -/// once the supervisor-level policy hook lands. pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -74,63 +31,55 @@ impl std::error::Error for LockTimeout {} // Internals // --------------------------------------------------------------------------- -/// A pending lock attempt. Sits in `MutexCore::state.waiters` from the -/// moment an actor parks until it is either granted the lock (popped by -/// `MutexGuard::drop`) or times out (popped by `on_timeout`). struct Wait { pid: Pid, - /// Per-mutex monotonic sequence. Lets `on_timeout` recognise "this - /// specific wait" vs. "a later wait by the same pid on the same - /// mutex" — important because a single actor can re-acquire and then - /// re-wait, and we don't want a stale timer firing to disturb the new - /// wait. seq: u64, } -/// The non-generic part of the mutex. Lives inside `Rc<>` so it can also -/// be stashed (as `Rc`) inside a timer entry. -struct MutexCore { - state: RefCell, - default_timeout: Cell, -} - struct MutexState { holder: Option, waiters: VecDeque, next_seq: u64, + default_timeout: Duration, +} + +struct MutexCore { + state: StdMutex, } impl MutexCore { fn new(default_timeout: Duration) -> Self { Self { - state: RefCell::new(MutexState { + state: StdMutex::new(MutexState { holder: None, waiters: VecDeque::new(), next_seq: 0, + default_timeout, }), - default_timeout: Cell::new(default_timeout), } } } impl TimerTarget for MutexCore { fn on_timeout(&self, pid: Pid, wait_seq: u64) { - // Remove the waiter with this seq, if it's still queued. If it's - // gone the lock was already granted to this actor before the timer - // popped — the actor will return normally; do nothing. - let removed = { - let mut st = self.state.borrow_mut(); - if let Some(pos) = st.waiters.iter().position(|w| w.seq == wait_seq) { - st.waiters.remove(pos); + let unpark = { + let mut st = self.state.lock().unwrap(); + // Remove from waiters only if still there with matching seq. + // If the lock was already granted (holder == Some(pid)), the + // timer fired after the grant — treat as no-op; the actor + // will see `is_holder == true` and return Ok. + if st.holder == Some(pid) { + return; + } + let pos = st.waiters.iter().position(|w| w.pid == pid && w.seq == wait_seq); + if pos.is_some() { + st.waiters.remove(pos.unwrap()); true } else { false } }; - if removed { - // The actor is parked, waiting on us. Wake it up; `lock_timeout` - // will resume, observe `holder != Some(self)`, and return - // LockTimeout. + if unpark { scheduler::unpark(pid); } } @@ -141,145 +90,105 @@ impl TimerTarget for MutexCore { // --------------------------------------------------------------------------- pub struct Mutex { - core: Rc, - /// `None` while the lock is held; `Some(T)` while free or while a - /// grantee is in the gap between unpark and resumption. - value: Rc>>, + core: Arc, + /// Protected value. `None` while a guard is live; `Some` while free. + value: Arc>>, } impl Mutex { pub fn new(value: T) -> Self { Self { - core: Rc::new(MutexCore::new(DEFAULT_TIMEOUT)), - value: Rc::new(RefCell::new(Some(value))), + core: Arc::new(MutexCore::new(DEFAULT_TIMEOUT)), + value: Arc::new(StdMutex::new(Some(value))), } } - /// Set the default timeout used by `lock()`. Does not affect in-flight - /// `lock_timeout` calls. pub fn set_default_timeout(&self, timeout: Duration) { - self.core.default_timeout.set(timeout); + self.core.state.lock().unwrap().default_timeout = timeout; } - /// Acquire the lock, blocking the calling actor until it's granted or - /// the default timeout expires. pub fn lock(&self) -> Result, LockTimeout> { - self.lock_timeout(self.core.default_timeout.get()) + let timeout = self.core.state.lock().unwrap().default_timeout; + self.lock_timeout(timeout) } - /// Acquire the lock with an explicit timeout. pub fn lock_timeout(&self, timeout: Duration) -> Result, LockTimeout> { let me = scheduler::self_pid(); - // Fast path: nobody holds it. Mark ourselves as holder, take the - // value out, return a guard. + // Fast path: nobody holds it. { - let mut st = self.core.state.borrow_mut(); + let mut st = self.core.state.lock().unwrap(); if st.holder.is_none() { st.holder = Some(me); drop(st); - let value = self - .value - .borrow_mut() - .take() + let value = self.value.lock().unwrap().take() .expect("Mutex: value missing on free fast path"); - return Ok(MutexGuard { - mutex: self, - value: Some(value), - }); + return Ok(MutexGuard { mutex: self, value: Some(value) }); } } - // Slow path: register as a waiter, schedule a timeout, park. - // No preemption during prep-to-park — see scheduler::NoPreempt. + // Slow path: register as a waiter, set timeout, park. let _np = scheduler::NoPreempt::enter(); let seq = { - let mut st = self.core.state.borrow_mut(); + let mut st = self.core.state.lock().unwrap(); let seq = st.next_seq; st.next_seq = st.next_seq.wrapping_add(1); st.waiters.push_back(Wait { pid: me, seq }); seq }; - let target: Rc = self.core.clone(); + let target: Arc = self.core.clone(); let deadline = timer::deadline_from_now(timeout); scheduler::insert_wait_timer(deadline, me, target, seq); scheduler::park_current(); - // Resumed. Two possibilities: - // (a) MutexGuard::drop on the previous holder popped us off the - // waiters queue, set core.holder = me, and unparked us. - // => self.value is Some, we take it and return Ok. - // (b) on_timeout fired: it removed us from waiters and unparked - // us, but did NOT set holder. core.holder is whatever it was - // (Some(other) or None). => return Err. - let is_holder = self.core.state.borrow().holder == Some(me); + // Resumed. Are we the holder? + let is_holder = self.core.state.lock().unwrap().holder == Some(me); if is_holder { - let value = self - .value - .borrow_mut() - .take() + let value = self.value.lock().unwrap().take() .expect("Mutex: value missing after grant"); - Ok(MutexGuard { - mutex: self, - value: Some(value), - }) + Ok(MutexGuard { mutex: self, value: Some(value) }) } else { Err(LockTimeout) } } - /// Non-blocking attempt. Returns `Some` if the lock was free, `None` - /// otherwise. Useful as a fast path before a long-running computation, - /// or for tests. pub fn try_lock(&self) -> Option> { - let mut st = self.core.state.borrow_mut(); + let me = scheduler::self_pid(); + let mut st = self.core.state.lock().unwrap(); if st.holder.is_some() { return None; } - let me = scheduler::self_pid(); st.holder = Some(me); drop(st); - let value = self - .value - .borrow_mut() - .take() + let value = self.value.lock().unwrap().take() .expect("Mutex: value missing on try_lock free path"); - Some(MutexGuard { - mutex: self, - value: Some(value), - }) + Some(MutexGuard { mutex: self, value: Some(value) }) } } impl Clone for Mutex { - /// Cloning a `Mutex` clones the handle, not the protected value — - /// both clones refer to the same lock state and the same `T`. fn clone(&self) -> Self { - Self { - core: self.core.clone(), - value: self.value.clone(), - } + Self { core: self.core.clone(), value: self.value.clone() } } } +// Genuinely Send + Sync now that internals are Arc>. +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + // --------------------------------------------------------------------------- // Guard // --------------------------------------------------------------------------- pub struct MutexGuard<'a, T> { mutex: &'a Mutex, - /// The protected value, taken out of `mutex.value` while the guard is - /// alive. `Option` only so `Drop` can put it back; in normal use this - /// is always `Some` while the guard is observable. value: Option, } impl std::ops::Deref for MutexGuard<'_, T> { type Target = T; - fn deref(&self) -> &T { - self.value.as_ref().expect("MutexGuard: value missing") - } + fn deref(&self) -> &T { self.value.as_ref().expect("MutexGuard: value missing") } } impl std::ops::DerefMut for MutexGuard<'_, T> { @@ -288,19 +197,22 @@ impl std::ops::DerefMut for MutexGuard<'_, T> { } } +impl std::fmt::Debug for MutexGuard<'_, T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("MutexGuard") + .field(self.value.as_ref().expect("MutexGuard: value missing")) + .finish() + } +} + impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { - // Put the value back into the mutex. let v = self.value.take().expect("MutexGuard: double drop"); - *self.mutex.value.borrow_mut() = Some(v); + *self.mutex.value.lock().unwrap() = Some(v); - // Pick the next waiter (if any) and grant it the lock by writing - // its pid into `holder` *before* unparking. The grantee, on - // resumption, will see `holder == self_pid` and take the value. let next_pid = { - let mut st = self.mutex.core.state.borrow_mut(); - let next = st.waiters.pop_front(); - match next { + let mut st = self.mutex.core.state.lock().unwrap(); + match st.waiters.pop_front() { Some(w) => { st.holder = Some(w.pid); Some(w.pid) diff --git a/src/runtime.rs b/src/runtime.rs new file mode 100644 index 0000000..936a87e --- /dev/null +++ b/src/runtime.rs @@ -0,0 +1,669 @@ +//! Multi-scheduler runtime: configuration, initialisation, and the shared +//! state that all scheduler OS threads operate against. +//! +//! # Architecture +//! +//! ```text +//! init(Config) → Runtime (Arc) +//! +//! RuntimeInner { +//! shared: Mutex ← slot table, run queue, timers, IO +//! stats: Vec ← one per thread, lockless atomics (RFC 000) +//! io_parked: AtomicU32 ← actors parked on IO +//! sleeping: AtomicU32 ← actors parked on timer +//! } +//! ``` +//! +//! `Runtime::run(f)` spawns N OS threads (one per `Config::resolved_thread_count()`), +//! each running `schedule_loop`. It blocks until all scheduler threads exit, +//! i.e. until the run queue is empty and nothing is pending. +//! +//! Each scheduler thread holds an `Arc` clone. Per-thread +//! identity is a small integer index, stored in a thread-local, used to index +//! into `stats`. +//! +//! # Timer / IO drain (try-lock, one-winner) +//! +//! On each loop iteration every scheduler thread tries `try_lock()` on a +//! separate `drain_lock: Mutex<()>`. The winner drains due timers and IO +//! completions; losers skip and move straight to popping an actor from the +//! run queue. This is the simplest correct approach; revisit if the drain +//! becomes a measured bottleneck. + +use crate::actor::{ + clear_current_pid, current_pid, is_actor_done, reset_actor_done, + set_current_actor_box, set_current_pid, take_last_outcome, Actor, Outcome, +}; +use crate::channel::Sender; +use crate::context::{get_actor_sp, set_actor_sp, switch_to_actor}; +use crate::io::IoThread; +use crate::pid::Pid; +use crate::preempt::PREEMPTION_ENABLED; +use crate::supervisor::Signal; +use crate::timer::Timers; + +use std::collections::VecDeque; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread; + +// --------------------------------------------------------------------------- +// Config +// --------------------------------------------------------------------------- + +/// Runtime configuration. +/// +/// ``` +/// use smarm::runtime::Config; +/// +/// // Use all available CPUs (default): +/// let c = Config::default(); +/// +/// // Exactly 4 scheduler threads: +/// let c = Config::exact(4); +/// +/// // Between 2 and 8, clamped to available parallelism: +/// let c = Config::new(2, 8, None); +/// ``` +#[derive(Clone, Debug)] +pub struct Config { + min: usize, + max: usize, + exact: Option, +} + +impl Config { + /// Exact thread count; takes precedence over min/max. + pub fn exact(n: usize) -> Self { + assert!(n >= 1, "scheduler thread count must be ≥ 1"); + Self { min: n, max: n, exact: Some(n) } + } + + /// Bounded range. Thread count = clamp(available_parallelism, min, max). + pub fn new(min: usize, max: usize, exact: Option) -> Self { + assert!(min >= 1, "min must be ≥ 1"); + assert!(max >= min, "max must be ≥ min"); + if let Some(e) = exact { + assert!(e >= 1, "exact must be ≥ 1"); + } + Self { min, max, exact } + } + + /// The number of scheduler threads this config resolves to. + pub fn resolved_thread_count(&self) -> usize { + if let Some(e) = self.exact { + return e; + } + let avail = thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + avail.clamp(self.min, self.max) + } +} + +impl Default for Config { + fn default() -> Self { + let avail = thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + Self { min: 1, max: avail, exact: None } + } +} + +// --------------------------------------------------------------------------- +// Per-thread stats (RFC 000 Layer 1 primitives) +// --------------------------------------------------------------------------- + +/// Lockless per-scheduler-thread counters. Written only by the owning thread; +/// readable from any thread (introspection actor, tests). +pub struct SchedulerStats { + /// PID index of the actor currently on-CPU, or `u32::MAX` when idle. + pub current_pid_index: AtomicU32, + /// Snapshot of run queue length maintained on every push/pop. + pub run_queue_len: AtomicU64, +} + +impl SchedulerStats { + fn new() -> Self { + Self { + current_pid_index: AtomicU32::new(u32::MAX), + run_queue_len: AtomicU64::new(0), + } + } +} + +// --------------------------------------------------------------------------- +// Runtime stats snapshot (for tests / introspection) +// --------------------------------------------------------------------------- + +pub struct RuntimeStats { + pub(crate) inner: Arc, +} + +impl RuntimeStats { + /// Sum of run queue lengths across all scheduler threads. + pub fn total_run_queue_len(&self) -> u64 { + self.inner.stats.iter() + .map(|s| s.run_queue_len.load(Ordering::Relaxed)) + .sum() + } + + /// Number of scheduler threads. + pub fn scheduler_count(&self) -> usize { + self.inner.stats.len() + } + + /// Actors currently parked on IO. + pub fn io_parked_count(&self) -> u32 { + self.inner.io_parked.load(Ordering::Relaxed) + } + + /// Actors currently sleeping on a timer. + pub fn sleeping_count(&self) -> u32 { + self.inner.sleeping.load(Ordering::Relaxed) + } +} + +// --------------------------------------------------------------------------- +// Shared state (behind Mutex<>) +// --------------------------------------------------------------------------- + +pub(crate) const ACTOR_STACK_SIZE: usize = 64 * 1024; + +pub(crate) enum State { Runnable, Parked, Done } + +struct Slot { + generation: u32, + actor: Option, + state: State, + waiters: Vec, + outcome: Option, + supervisor_channel: Option>, + outstanding_handles: u32, + pending_io_result: Option, +} + +impl Slot { + fn vacant() -> Self { + Self { + generation: 0, + actor: None, + state: State::Done, + waiters: Vec::new(), + outcome: None, + supervisor_channel: None, + outstanding_handles: 0, + pending_io_result: None, + } + } +} + +pub(crate) type Closure = Box; + +pub(crate) struct SharedState { + pub(crate) slots: Vec, + pub(crate) free_list: Vec, + pub(crate) run_queue: VecDeque, + pub(crate) root_pid: Option, + pub(crate) timers: Timers, + pub(crate) io: Option, + /// Closures awaiting their first resume, keyed by Pid. + pub(crate) pending_closures: Vec<(Pid, Closure)>, +} + +impl SharedState { + fn new() -> Self { + Self { + slots: Vec::new(), + free_list: Vec::new(), + run_queue: VecDeque::new(), + root_pid: None, + timers: Timers::new(), + io: None, + pending_closures: Vec::new(), + } + } + + fn allocate_slot(&mut self) -> (u32, u32) { + if let Some(idx) = self.free_list.pop() { + let gen = self.slots[idx as usize].generation; + (idx, gen) + } else { + let idx = self.slots.len() as u32; + self.slots.push(Slot::vacant()); + (idx, 0) + } + } + + fn slot(&self, pid: Pid) -> Option<&Slot> { + let s = self.slots.get(pid.index() as usize)?; + if s.generation == pid.generation() { Some(s) } else { None } + } + + fn slot_mut(&mut self, pid: Pid) -> Option<&mut Slot> { + let s = self.slots.get_mut(pid.index() as usize)?; + if s.generation == pid.generation() { Some(s) } else { None } + } + + fn pop_pending_closure(&mut self, pid: Pid) -> Option { + let pos = self.pending_closures.iter().position(|(p, _)| *p == pid)?; + Some(self.pending_closures.swap_remove(pos).1) + } +} + +// --------------------------------------------------------------------------- +// RuntimeInner — the shared core behind an Arc +// --------------------------------------------------------------------------- + +pub(crate) struct RuntimeInner { + pub(crate) shared: Mutex, + /// Try-lock: exactly one scheduler thread drains timers/IO per iteration. + drain_lock: Mutex<()>, + /// Per-thread stats, indexed by scheduler thread slot (0..N). + pub(crate) stats: Vec, + /// Global counters for RFC 000 primitives. + pub(crate) io_parked: AtomicU32, + pub(crate) sleeping: AtomicU32, +} + +impl RuntimeInner { + fn new(thread_count: usize) -> Arc { + let stats = (0..thread_count).map(|_| SchedulerStats::new()).collect(); + Arc::new(Self { + shared: Mutex::new(SharedState::new()), + drain_lock: Mutex::new(()), + stats, + io_parked: AtomicU32::new(0), + sleeping: AtomicU32::new(0), + }) + } + + fn with_shared(&self, f: impl FnOnce(&mut SharedState) -> R) -> R { + f(&mut self.shared.lock().unwrap()) + } + + /// Returns `None` when the mutex is poisoned. + /// Used in `unpark` / channel Drop which can fire after teardown. + fn try_with_shared(&self, f: impl FnOnce(&mut SharedState) -> R) -> Option { + match self.shared.lock() { + Ok(mut g) => Some(f(&mut g)), + Err(p) => Some(f(&mut p.into_inner())), + } + } +} + +// --------------------------------------------------------------------------- +// Runtime — the public handle +// --------------------------------------------------------------------------- + +pub struct Runtime { + inner: Arc, + thread_count: usize, +} + +/// Initialise the runtime with the given config. Returns a reusable handle. +pub fn init(config: Config) -> Runtime { + let n = config.resolved_thread_count(); + Runtime { + inner: RuntimeInner::new(n), + thread_count: n, + } +} + +impl Runtime { + /// Run `f` as the initial actor, block until all actors finish. + /// Can be called multiple times sequentially on the same `Runtime`. + pub fn run(&self, f: impl FnOnce() + Send + 'static) { + // Re-initialise shared state for this run. + { + let mut s = self.inner.shared.lock().unwrap(); + assert!(s.run_queue.is_empty(), "run() called while previous run still active"); + s.root_pid = Some(ROOT_PID); + s.io = Some(IoThread::start().expect("failed to start IO thread")); + } + + // Spawn the initial actor through the public spawn path (which + // requires a running runtime in the thread-local). + RUNTIME.with(|r| *r.borrow_mut() = Some(self.inner.clone())); + let initial_handle = crate::scheduler::spawn(f); + + // Launch N-1 extra scheduler threads. The calling thread is thread 0. + let mut os_threads = Vec::new(); + for slot in 1..self.thread_count { + let inner = self.inner.clone(); + let t = thread::spawn(move || { + RUNTIME.with(|r| *r.borrow_mut() = Some(inner.clone())); + SCHED_SLOT.with(|s| s.set(slot)); + schedule_loop(&inner, slot); + RUNTIME.with(|r| *r.borrow_mut() = None); + }); + os_threads.push(t); + } + + // Thread 0 runs the loop on the calling thread. + SCHED_SLOT.with(|s| s.set(0)); + schedule_loop(&self.inner, 0); + + // Wait for all other scheduler threads. + for t in os_threads { + let _ = t.join(); + } + + // Drop initial handle (decrements outstanding_handles count). + drop(initial_handle); + + // Tear down IO and clean up shared state for the next run() call. + let mut s = self.inner.shared.lock().unwrap(); + drop(s.io.take()); // joins IO threads + s.pending_closures.clear(); + // Reset per-thread stats. + for stat in &self.inner.stats { + stat.current_pid_index.store(u32::MAX, Ordering::Relaxed); + stat.run_queue_len.store(0, Ordering::Relaxed); + } + self.inner.io_parked.store(0, Ordering::Relaxed); + self.inner.sleeping.store(0, Ordering::Relaxed); + + RUNTIME.with(|r| *r.borrow_mut() = None); + } + + /// Snapshot of runtime statistics for introspection / tests. + pub fn stats(&self) -> RuntimeStats { + RuntimeStats { inner: self.inner.clone() } + } +} + +// --------------------------------------------------------------------------- +// Thread-locals +// --------------------------------------------------------------------------- + +use std::cell::{Cell, RefCell}; + +thread_local! { + /// The RuntimeInner for the current run(). Set by run() on the calling + /// thread and by each spawned scheduler thread. + pub(crate) static RUNTIME: RefCell>> = + const { RefCell::new(None) }; + + /// This scheduler thread's index into RuntimeInner::stats. + static SCHED_SLOT: Cell = const { Cell::new(0) }; + + /// What the actor wants when it yields back to the scheduler. + static YIELD_INTENT: Cell = const { Cell::new(YieldIntent::Yield) }; +} + +#[derive(Copy, Clone)] +pub(crate) enum YieldIntent { Yield, Park } + +pub(crate) fn set_yield_intent(i: YieldIntent) { + YIELD_INTENT.with(|c| c.set(i)); +} + +// --------------------------------------------------------------------------- +// Sentinel root PID +// --------------------------------------------------------------------------- + +pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX); + +// --------------------------------------------------------------------------- +// Slot reclamation +// --------------------------------------------------------------------------- + +pub(crate) fn reclaim_slot(s: &mut SharedState, pid: Pid) { + let idx = pid.index(); + let slot = &mut s.slots[idx as usize]; + slot.generation = slot.generation.wrapping_add(1); + slot.actor = None; + slot.outcome = None; + slot.waiters.clear(); + slot.supervisor_channel = None; + slot.state = State::Done; + slot.outstanding_handles = 0; + slot.pending_io_result = None; + s.free_list.push(idx); +} + +// --------------------------------------------------------------------------- +// finalize_actor +// --------------------------------------------------------------------------- + +fn finalize_actor(inner: &Arc, pid: Pid, outcome: Outcome) { + let (joiner_outcome, sup_signal) = match outcome { + Outcome::Exit => (Outcome::Exit, Signal::Exit(pid)), + Outcome::Panic(payload) => ( + Outcome::Panic(payload), + Signal::Panic(pid, Box::new(()) as Box), + ), + }; + + let (waiters, supervisor_pid) = inner.with_shared(|s| { + let slot = s.slot_mut(pid).expect("finalize_actor: slot vanished"); + let sup = slot.actor.as_ref().map(|a| a.supervisor); + slot.outcome = Some(joiner_outcome); + slot.state = State::Done; + slot.actor = None; + (std::mem::take(&mut slot.waiters), sup) + }); + + // Deliver to supervisor. + if let Some(sup) = supervisor_pid { + let sender = inner.with_shared(|s| { + s.slot(sup).and_then(|slot| slot.supervisor_channel.clone()) + }); + if let Some(sender) = sender { + let _ = sender.send(sup_signal); + } + } + + // Unpark joiners. + for joiner in waiters { + crate::scheduler::unpark(joiner); + } + + // Reclaim if no outstanding handles. + inner.with_shared(|s| { + let reclaim = s.slot(pid).map(|slot| slot.outstanding_handles == 0).unwrap_or(false); + if reclaim { reclaim_slot(s, pid); } + }); +} + +// --------------------------------------------------------------------------- +// schedule_loop — runs on each scheduler OS thread +// --------------------------------------------------------------------------- + +fn schedule_loop(inner: &Arc, slot: usize) { + let stats = &inner.stats[slot]; + + loop { + // ---------------------------------------------------------------- + // 1. Try to win the drain lock (timers + IO). One winner per round; + // losers skip immediately and proceed to step 2. + // ---------------------------------------------------------------- + if let Ok(_drain_guard) = inner.drain_lock.try_lock() { + let now = std::time::Instant::now(); + + // Drain due timers. + let due = inner.with_shared(|s| s.timers.pop_due(now)); + for entry in due { + match entry.reason { + crate::timer::Reason::Sleep => { + inner.with_shared(|s| { + if let Some(slot) = s.slot_mut(entry.pid) { + if matches!(slot.state, State::Parked) { + slot.state = State::Runnable; + s.run_queue.push_back(entry.pid); + } + } + }); + } + crate::timer::Reason::WaitTimeout { target, wait_seq } => { + // Runs outside with_shared — the callback may call unpark. + target.on_timeout(entry.pid, wait_seq); + } + } + } + + // Drain IO completions. + let completions = inner.with_shared(|s| { + s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default() + }); + for completion in completions { + match completion { + crate::io::Completion::Blocking { pid, result } => { + inner.with_shared(|s| { + if let Some(io) = s.io.as_mut() { + io.outstanding = io.outstanding.saturating_sub(1); + } + if let Some(slot) = s.slot_mut(pid) { + slot.pending_io_result = Some(result); + if matches!(slot.state, State::Parked) { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + } + } + }); + } + crate::io::Completion::FdReady { fd, events: _ } => { + inner.with_shared(|s| { + let parked_pid = s.io.as_mut().and_then(|io| { + let pid = io.waiters.remove(&fd); + io.epoll_deregister(fd); + pid + }); + if let Some(pid) = parked_pid { + if let Some(slot) = s.slot_mut(pid) { + if matches!(slot.state, State::Parked) { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + } + } + } + }); + } + } + } + } // drain_guard drops here + + // ---------------------------------------------------------------- + // 2. Pop a runnable actor from the shared queue. + // ---------------------------------------------------------------- + let pid = match inner.with_shared(|s| { + let len = s.run_queue.len() as u64; + stats.run_queue_len.store(len, Ordering::Relaxed); + s.run_queue.pop_front() + }) { + Some(p) => p, + None => { + // Nothing runnable. Check whether we should wait or exit. + let (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) = + inner.with_shared(|s| { + let next = s.timers.peek_deadline(); + let (out, fd) = match s.io.as_ref() { + Some(io) => ( + io.outstanding + io.waiters.len() as u32, + Some(io.wake_fd()), + ), + None => (0, None), + }; + // Count actors that are not Done (Runnable or Parked). + let live = s.slots.iter().filter(|slot| { + slot.actor.is_some() + }).count(); + (next, out, fd, s.run_queue.is_empty(), live) + }); + + match (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) { + // Queue is now non-empty (another thread added work): retry. + (_, _, _, false, _) => continue, + // Truly idle — no timers, no IO, no live actors. + (None, 0, _, true, 0) => return, + // Live actors but queue empty: they must be parked on IO or + // timers. Wait on the appropriate source. + (Some(deadline), _, fd_opt, true, _) => { + let now = std::time::Instant::now(); + if deadline > now { + let timeout = deadline - now; + match fd_opt { + Some(fd) => { + crate::io::poll_wake(fd, Some(timeout)); + crate::io::drain_wake_pipe(fd); + } + None => thread::sleep(timeout), + } + } + continue; + } + (None, _, Some(fd), true, _) => { + crate::io::poll_wake(fd, None); + crate::io::drain_wake_pipe(fd); + continue; + } + // Live actors, queue empty, no IO/timers: they're parked + // waiting for each other (potential deadlock in user code), + // or another thread is about to add work. Sleep briefly to + // avoid hammering the shared mutex. + _ => { + thread::sleep(std::time::Duration::from_micros(100)); + continue; + } + } + } + }; + + // ---------------------------------------------------------------- + // 3. Resume the actor. + // ---------------------------------------------------------------- + let sp = match inner.with_shared(|s| { + s.slot(pid).and_then(|slot| slot.actor.as_ref().map(|a| a.sp)) + }) { + Some(sp) => sp, + None => continue, // stale pid + }; + + // First resume: move the closure into the trampoline's thread-local. + if let Some(b) = inner.with_shared(|s| s.pop_pending_closure(pid)) { + set_current_actor_box(b); + } + + // Update per-thread stats: record who's on-CPU. + stats.current_pid_index.store(pid.index(), Ordering::Relaxed); + + set_actor_sp(sp); + set_current_pid(pid); + reset_actor_done(); + YIELD_INTENT.with(|c| c.set(YieldIntent::Yield)); + crate::preempt::reset_timeslice(); + PREEMPTION_ENABLED.with(|c| c.set(true)); + + unsafe { switch_to_actor() }; + + PREEMPTION_ENABLED.with(|c| c.set(false)); + stats.current_pid_index.store(u32::MAX, Ordering::Relaxed); + clear_current_pid(); + + let intent = YIELD_INTENT.with(|c| c.get()); + let new_sp = get_actor_sp(); + + if is_actor_done() { + let outcome = take_last_outcome().unwrap_or(Outcome::Exit); + finalize_actor(inner, pid, outcome); + } else { + inner.with_shared(|s| { + if let Some(slot) = s.slot_mut(pid) { + if let Some(actor) = slot.actor.as_mut() { + actor.sp = new_sp; + } + match intent { + YieldIntent::Yield => { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + } + YieldIntent::Park => { + slot.state = State::Parked; + } + } + } + }); + } + } +} diff --git a/src/scheduler.rs b/src/scheduler.rs index 1ab238b..43345a6 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,208 +1,75 @@ -//! The single-threaded scheduler. +//! Scheduler public API — thin façade over the multi-scheduler runtime. //! -//! There is one global scheduler per OS thread, stored in a thread-local. -//! `run(initial)` initialises it, spawns the initial actor, drives the loop -//! until the run queue is empty, then tears it down. +//! All heavy lifting lives in `runtime.rs`. This module exposes the same +//! surface that the rest of the codebase (channel, mutex, io, timer, actor) +//! calls into, plus the public API re-exported from `lib.rs`. //! -//! Slot table: a `Vec` indexed by `Pid::index()`, with a free list of -//! reusable indices. Each slot has a `generation` counter that increments -//! every time the slot is freed; `Pid` carries the generation it was minted -//! with, so a stale PID has a mismatching generation and is detected on -//! lookup. -//! -//! Run queue: a `VecDeque` of runnable actors. The state of an actor -//! is implicit in slot.state: `Runnable` means it's either in the queue or -//! currently executing; `Parked` means it's waiting for something to unpark -//! it (channel send, join completion, …); `Done` means it has finished and -//! is awaiting reaping. -//! -//! Joining: `JoinHandle::join()` parks the calling actor and registers it -//! on the target slot's `waiters` list. When the target actor finishes, -//! the scheduler reaps the slot and unparks every waiter, passing them the -//! outcome via a side channel (the target's `outcome` field, drained on -//! the joiner side). +//! The single-threaded `run()` entry point is kept as a convenience wrapper +//! around `runtime::init(Config::exact(1)).run(f)`. -use crate::actor::{ - clear_current_pid, current_pid, is_actor_done, reset_actor_done, - set_current_actor_box, set_current_pid, take_last_outcome, trampoline, Actor, Outcome, -}; +use crate::actor::current_pid; use crate::channel::Sender; -use crate::context::{get_actor_sp, init_actor_stack, set_actor_sp, switch_to_actor}; use crate::pid::Pid; -use crate::preempt::PREEMPTION_ENABLED; -use crate::stack::Stack; +use crate::runtime::{ + self, RuntimeInner, YieldIntent, ROOT_PID, RUNTIME, +}; use crate::supervisor::Signal; -use std::cell::RefCell; -use std::collections::VecDeque; +use std::sync::Arc; // --------------------------------------------------------------------------- -// Configuration +// with_runtime / try_with_runtime // --------------------------------------------------------------------------- -const ACTOR_STACK_SIZE: usize = 64 * 1024; - -// --------------------------------------------------------------------------- -// Per-actor slot -// --------------------------------------------------------------------------- - -enum State { - /// Either in the run queue or currently executing. - Runnable, - /// Removed from the queue, waiting for `unpark()`. - Parked, - /// The actor has finished. Slot persists until the last `JoinHandle` - /// has been joined (or dropped). Then the slot is freed. - Done, -} - -struct Slot { - /// Bumped every time this slot is freed and re-used. A `Pid` with a - /// non-matching generation is stale. - generation: u32, - /// `None` when the slot is free. `Some` otherwise. - actor: Option, - state: State, - /// PIDs waiting in `JoinHandle::join`. - waiters: Vec, - /// The outcome the actor produced, captured when it finished. - /// Drained by `JoinHandle::join`. - outcome: Option, - /// If this slot is a supervisor, the sender into its `Signal` mailbox. - /// Cloned out and used when one of its children dies. - supervisor_channel: Option>, - /// Number of `JoinHandle`s still outstanding for this actor. The slot - /// is reclaimed only when the actor is done AND outstanding_handles == 0. - outstanding_handles: u32, - /// One-shot mailbox for the result of an in-flight `block_on_io` call. - /// The scheduler writes it on completion; `block_on_io` reads it on - /// resume. - pending_io_result: Option, -} - -impl Slot { - fn vacant() -> Self { - Self { - generation: 0, - actor: None, - state: State::Done, - waiters: Vec::new(), - outcome: None, - supervisor_channel: None, - outstanding_handles: 0, - pending_io_result: None, - } - } -} - -// --------------------------------------------------------------------------- -// Scheduler state -// --------------------------------------------------------------------------- - -struct SchedulerState { - slots: Vec, - free_list: Vec, - run_queue: VecDeque, - /// The root supervisor's PID. Children spawned at the top level are - /// supervised by this. Set by `run()`. - root_pid: Option, - /// Pending sleep timers. Min-heap keyed by deadline. - timers: crate::timer::Timers, - /// IO worker thread. `None` outside `run()`. - io: Option, -} - -impl SchedulerState { - fn new() -> Self { - Self { - slots: Vec::new(), - free_list: Vec::new(), - run_queue: VecDeque::new(), - root_pid: None, - timers: crate::timer::Timers::new(), - io: None, - } - } - - /// Allocate a slot; return its (index, generation). - fn allocate_slot(&mut self) -> (u32, u32) { - if let Some(idx) = self.free_list.pop() { - let s = &mut self.slots[idx as usize]; - (idx, s.generation) - } else { - let idx = self.slots.len() as u32; - self.slots.push(Slot::vacant()); - (idx, 0) - } - } - - fn slot(&self, pid: Pid) -> Option<&Slot> { - let s = self.slots.get(pid.index() as usize)?; - if s.generation == pid.generation() { Some(s) } else { None } - } - - fn slot_mut(&mut self, pid: Pid) -> Option<&mut Slot> { - let s = self.slots.get_mut(pid.index() as usize)?; - if s.generation == pid.generation() { Some(s) } else { None } - } -} - -thread_local! { - static SCHED: RefCell> = const { RefCell::new(None) }; -} - -fn with_sched(f: impl FnOnce(&mut SchedulerState) -> R) -> R { - SCHED.with(|c| { - let mut g = c.borrow_mut(); - let s = g.as_mut().expect("scheduler not running"); - f(s) +/// Borrow the current runtime. Panics if called outside `Runtime::run()`. +pub(crate) fn with_runtime(f: impl FnOnce(&Arc) -> R) -> R { + RUNTIME.with(|r| { + let b = r.borrow(); + let inner = b.as_ref().expect("smarm: not inside Runtime::run()"); + f(inner) }) } -/// Same as `with_sched` but returns `None` when there's no scheduler instead -/// of panicking. Used on cleanup paths (channel sender drop during shutdown, -/// for example). -fn try_with_sched(f: impl FnOnce(&mut SchedulerState) -> R) -> Option { - SCHED.with(|c| { - let mut g = c.borrow_mut(); - g.as_mut().map(f) - }) +/// Borrow the runtime if present; returns `None` otherwise. +/// Used on cleanup paths (channel Drop during teardown). +pub(crate) fn try_with_runtime(f: impl FnOnce(&Arc) -> R) -> Option { + RUNTIME.with(|r| r.borrow().as_ref().map(|inner| f(inner))) } // --------------------------------------------------------------------------- -// JoinHandle +// JoinHandle / JoinError // --------------------------------------------------------------------------- #[derive(Debug)] pub struct JoinError { - /// Whatever `panic!` was called with. pub payload: Box, } pub struct JoinHandle { pid: Pid, - /// `false` once `join()` has been called and the handle has consumed - /// its outcome. Prevents the Drop impl from double-decrementing. consumed: bool, } impl JoinHandle { pub fn pid(&self) -> Pid { self.pid } - /// Block the calling actor until the target completes. Returns - /// `Ok(())` on normal exit, `Err(JoinError)` if the target panicked. pub fn join(mut self) -> Result<(), JoinError> { + use crate::actor::Outcome; + use crate::runtime::State; // need State visibility + let me = current_pid().expect("join() called outside an actor"); loop { - let outcome = with_sched(|s| { - let slot = s.slot_mut(self.pid) - .expect("join: target slot has been reused"); - if matches!(slot.state, State::Done) { - Some(slot.outcome.take().expect("Done slot must have an outcome")) - } else { - slot.waiters.push(me); - None - } + let outcome = with_runtime(|inner| { + inner.with_shared(|s| { + let slot = s.slot_mut(self.pid) + .expect("join: target slot has been reused"); + if matches!(slot.state, State::Done) { + Some(slot.outcome.take().expect("Done slot must have outcome")) + } else { + slot.waiters.push(me); + None + } + }) }); match outcome { @@ -214,23 +81,30 @@ impl JoinHandle { Outcome::Panic(p) => Err(JoinError { payload: p }), }; } - None => park_current(), + None => { + let _np = NoPreempt::enter(); + park_current(); + } } } } fn decrement_handle_count(&mut self) { - with_sched(|s| { - let should_reclaim = match s.slot_mut(self.pid) { - Some(slot) => { - slot.outstanding_handles = slot.outstanding_handles.saturating_sub(1); - matches!(slot.state, State::Done) && slot.outstanding_handles == 0 + with_runtime(|inner| { + inner.with_shared(|s| { + let should_reclaim = match s.slot_mut(self.pid) { + Some(slot) => { + slot.outstanding_handles = + slot.outstanding_handles.saturating_sub(1); + matches!(slot.state, crate::runtime::State::Done) + && slot.outstanding_handles == 0 + } + None => false, + }; + if should_reclaim { + crate::runtime::reclaim_slot(s, self.pid); } - None => false, - }; - if should_reclaim { - reclaim_slot(s, self.pid); - } + }) }); } } @@ -238,126 +112,89 @@ impl JoinHandle { impl Drop for JoinHandle { fn drop(&mut self) { if !self.consumed { - self.decrement_handle_count(); + // May be called outside run() if handle is dropped after teardown. + if try_with_runtime(|_| ()).is_some() { + self.decrement_handle_count(); + } } } } -// --------------------------------------------------------------------------- -// Slot reclamation -// --------------------------------------------------------------------------- - -fn reclaim_slot(s: &mut SchedulerState, pid: Pid) { - let idx = pid.index(); - let slot = &mut s.slots[idx as usize]; - // Bump generation so any stale PIDs from now on miss. - slot.generation = slot.generation.wrapping_add(1); - // Drop the actor (its stack with it). - slot.actor = None; - slot.outcome = None; - slot.waiters.clear(); - slot.supervisor_channel = None; - slot.state = State::Done; // semantically vacant; allocator checks free_list - slot.outstanding_handles = 0; - slot.pending_io_result = None; - s.free_list.push(idx); -} - // --------------------------------------------------------------------------- // spawn / spawn_under / self_pid // --------------------------------------------------------------------------- -/// Spawn `f` as a child of the currently-executing actor. -/// Outside an actor (only legal from `run()`'s initial setup), the child's -/// supervisor is the root supervisor. pub fn spawn(f: impl FnOnce() + Send + 'static) -> JoinHandle { let parent = current_pid() - .or_else(|| with_sched(|s| s.root_pid)) + .or_else(|| with_runtime(|inner| inner.with_shared(|s| s.root_pid))) .expect("spawn() before run()"); spawn_under(parent, f) } -/// Spawn `f` with `supervisor` as its parent. The supervisor will receive -/// a `Signal` on its registered channel when the child terminates. pub fn spawn_under(supervisor: Pid, f: impl FnOnce() + Send + 'static) -> JoinHandle { - let pid = with_sched(|s| { - let (idx, gen) = s.allocate_slot(); - let pid = Pid::new(idx, gen); - let stack = Stack::new(ACTOR_STACK_SIZE) - .expect("stack allocation failed"); - let sp = init_actor_stack(stack.top(), trampoline); - let slot = &mut s.slots[idx as usize]; - slot.actor = Some(Actor { pid, stack, sp, supervisor }); - slot.state = State::Runnable; - slot.outstanding_handles = 1; - slot.outcome = None; - slot.waiters.clear(); - slot.supervisor_channel = None; - slot.pending_io_result = None; - s.run_queue.push_back(pid); - pid - }); - - // Stash the closure where `schedule_loop` will find it before the first - // resume. - PENDING_CLOSURES.with(|c| { - c.borrow_mut().push((pid, Box::new(f) as Closure)); + let pid = with_runtime(|inner| { + inner.with_shared(|s| { + let (idx, gen) = s.allocate_slot(); + let pid = Pid::new(idx, gen); + let stack = crate::stack::Stack::new(crate::runtime::ACTOR_STACK_SIZE) + .expect("stack allocation failed"); + let sp = init_actor_stack(stack.top(), crate::actor::trampoline); + let slot = &mut s.slots[idx as usize]; + slot.actor = Some(crate::actor::Actor { pid, stack, sp, supervisor }); + slot.state = crate::runtime::State::Runnable; + slot.outstanding_handles = 1; + slot.outcome = None; + slot.waiters.clear(); + slot.supervisor_channel = None; + slot.pending_io_result = None; + s.run_queue.push_back(pid); + s.pending_closures.push((pid, Box::new(f) as crate::runtime::Closure)); + pid + }) }); JoinHandle { pid, consumed: false } } -type Closure = Box; - -thread_local! { - /// Closures awaiting their first resume. Keyed by the PID the scheduler - /// allocated for them in `spawn_under`. The scheduler pops from here in - /// `pop_pending_closure` right before each first resume. - static PENDING_CLOSURES: RefCell> = const { RefCell::new(Vec::new()) }; -} - -fn pop_pending_closure(pid: Pid) -> Option { - PENDING_CLOSURES.with(|c| { - let mut v = c.borrow_mut(); - v.iter().position(|(p, _)| *p == pid).map(|i| v.swap_remove(i).1) - }) -} +use crate::context::init_actor_stack; pub fn self_pid() -> Pid { current_pid().expect("self_pid() called outside an actor") } // --------------------------------------------------------------------------- -// yield_now / park / unpark +// yield_now / park_current / unpark // --------------------------------------------------------------------------- -/// Cooperative yield. The current actor goes to the back of the run queue. pub fn yield_now() { - // Mark ourselves as needing to be re-queued, then yield. - YIELD_INTENT.with(|c| c.set(YieldIntent::Yield)); + runtime::set_yield_intent(YieldIntent::Yield); unsafe { crate::context::switch_to_scheduler() }; } -/// Park the current actor (remove it from the run queue until `unpark`). pub fn park_current() { - YIELD_INTENT.with(|c| c.set(YieldIntent::Park)); + runtime::set_yield_intent(YieldIntent::Park); unsafe { crate::context::switch_to_scheduler() }; } -/// RAII guard that disables allocator-driven preemption for its lifetime. -/// -/// The "prep-to-park" hazard described in `preempt.rs`: a primitive that -/// (a) registers an unparker (channel waiter slot, fd waiter map, mutex -/// waiter queue, …) and then (b) calls `park_current()` must not yield -/// between (a) and (b). If it does, an early unpark fires while the actor -/// is still Runnable, the unpark no-ops, and then the actor parks with no -/// one to wake it. -/// -/// Library code wraps the prep + park in `let _g = NoPreempt::enter();` -/// and the guard is held until just after `park_current` returns (or -/// dropped earlier, immediately before `park_current`, since `park_current` -/// itself returns control to the scheduler which disables preemption on -/// its own path). +pub fn unpark(pid: Pid) { + let result = try_with_runtime(|inner| { + inner.with_shared(|s| { + if let Some(slot) = s.slot_mut(pid) { + if matches!(slot.state, crate::runtime::State::Parked) { + slot.state = crate::runtime::State::Runnable; + s.run_queue.push_back(pid); + } + } + }) + }); + // If try_with_runtime returns None we're in teardown — no-op is correct. + let _ = result; +} + +// --------------------------------------------------------------------------- +// NoPreempt +// --------------------------------------------------------------------------- + pub struct NoPreempt(bool); impl NoPreempt { @@ -373,506 +210,124 @@ impl Drop for NoPreempt { } } -/// Park the current actor for at least `duration`. A zero duration behaves -/// like `yield_now` (the deadline is immediately in the past, so the timer -/// pops on the next scheduler iteration). +// --------------------------------------------------------------------------- +// sleep / insert_wait_timer +// --------------------------------------------------------------------------- + pub fn sleep(duration: std::time::Duration) { let me = current_pid().expect("sleep() called outside an actor"); let _np = NoPreempt::enter(); let deadline = crate::timer::deadline_from_now(duration); - with_sched(|s| s.timers.insert_sleep(deadline, me)); + with_runtime(|inner| inner.with_shared(|s| s.timers.insert_sleep(deadline, me))); park_current(); } -/// Insert a `WaitTimeout` timer entry. Library code (`Mutex::lock_timeout` -/// today, future bounded-wait primitives) calls this just before -/// `park_current()` so that if the wait isn't satisfied by `deadline`, -/// `target.on_timeout(pid, wait_seq)` will fire. -/// -/// Cancellation: not needed. If the wait is satisfied early, the entry is -/// still in the heap and will pop in due course; `on_timeout` is expected -/// to be idempotent on stale-seq. pub fn insert_wait_timer( deadline: std::time::Instant, pid: Pid, - target: std::rc::Rc, + target: std::sync::Arc, wait_seq: u64, ) { - with_sched(|s| { - s.timers.insert( - deadline, - pid, - crate::timer::Reason::WaitTimeout { target, wait_seq }, - ); + with_runtime(|inner| { + inner.with_shared(|s| { + s.timers.insert( + deadline, + pid, + crate::timer::Reason::WaitTimeout { target, wait_seq }, + ); + }) }); } -/// Run `f` on the IO worker thread, park the current actor while it runs, -/// and return `f`'s value when it completes. Panics inside `f` propagate -/// to the calling actor. -/// -/// Use this for blocking calls that would otherwise stall the scheduler — -/// synchronous file IO, blocking C FFI, libpq, etc. +// --------------------------------------------------------------------------- +// block_on_io / wait_readable / wait_writable / read / write +// --------------------------------------------------------------------------- + pub fn block_on_io(f: F) -> T where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { let me = current_pid().expect("block_on_io() called outside an actor"); - - // Box the user closure into the wire-form result-shaped closure that - // the worker expects. The worker also wraps in catch_unwind, but doing - // it here too would let us downcast `T` only when the closure didn't - // panic. We let the worker handle catch_unwind so the boxing here - // stays straightforward. let work: Box crate::io::IoResult + Send> = Box::new(move || { let v: T = f(); Ok(Box::new(v) as Box) }); - { let _np = NoPreempt::enter(); - with_sched(|s| { + with_runtime(|inner| inner.with_shared(|s| { let io = s.io.as_mut().expect("io thread not started"); io.submit(me, work); - }); + })); park_current(); } - - // On resume, our slot has a result waiting. - let result = with_sched(|s| { + let result = with_runtime(|inner| inner.with_shared(|s| { s.slot_mut(me) .expect("block_on_io: own slot vanished") .pending_io_result .take() .expect("block_on_io: resumed without a result") - }); - + })); match result { - Ok(any) => *any - .downcast::() - .expect("block_on_io: result type mismatch — should be unreachable"), + Ok(any) => *any.downcast::().expect("block_on_io: type mismatch"), Err(payload) => std::panic::resume_unwind(payload), } } -// --------------------------------------------------------------------------- -// Fd-readiness primitives. -// -// `wait_readable(fd)` / `wait_writable(fd)` register interest with the -// epoll thread, park the calling actor, and return when the kernel -// signals readiness. The subsequent syscall (`read`/`write`) is done on -// the actor's own thread by the caller — no buffer crosses an actor -// boundary. -// -// Fds passed in should be O_NONBLOCK; see io.rs module docs. -// --------------------------------------------------------------------------- - -/// Park the calling actor until `fd` is readable. pub fn wait_readable(fd: std::os::fd::RawFd) -> std::io::Result<()> { - wait_fd(fd, /*readable=*/ true, /*writable=*/ false) + wait_fd(fd, true, false) } -/// Park the calling actor until `fd` is writable. pub fn wait_writable(fd: std::os::fd::RawFd) -> std::io::Result<()> { - wait_fd(fd, /*readable=*/ false, /*writable=*/ true) + wait_fd(fd, false, true) } fn wait_fd(fd: std::os::fd::RawFd, readable: bool, writable: bool) -> std::io::Result<()> { let me = current_pid().expect("wait_*() called outside an actor"); - - // Register with the epoll thread. If registration fails (bad fd, - // already-parked waiter, OOM in the kernel), return the error - // without parking — the actor never went to sleep. let _np = NoPreempt::enter(); - with_sched(|s| { + with_runtime(|inner| inner.with_shared(|s| { let io = s.io.as_mut().expect("io thread not started"); io.epoll_register(fd, me, readable, writable) - })?; - + }))?; park_current(); - // On resume, the scheduler has already removed `fd` from `waiters` - // and DEL'd it from epollfd. There is no per-call return value; - // success here just means "fd is ready, go do your syscall". - // - // Note: there is no error path on resume because v0.2 doesn't time - // out fd waits and doesn't otherwise spurious-wake. If those are - // added, this function grows a non-trivial return. Ok(()) } -/// Wait until `fd` is readable, then run a single `read(2)`. Returns the -/// number of bytes read, or an `io::Error` from the syscall. -/// -/// `fd` should be opened `O_NONBLOCK`. With a blocking fd, the kernel's -/// readiness signal does not guarantee a non-blocking read — a signal -/// could interrupt, and the actor's syscall would then stall the -/// scheduler thread. pub fn read(fd: std::os::fd::RawFd, buf: &mut [u8]) -> std::io::Result { wait_readable(fd)?; - let n = unsafe { - libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) - }; - if n < 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(n as usize) - } + let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) }; + if n < 0 { Err(std::io::Error::last_os_error()) } else { Ok(n as usize) } } -/// Wait until `fd` is writable, then run a single `write(2)`. pub fn write(fd: std::os::fd::RawFd, buf: &[u8]) -> std::io::Result { wait_writable(fd)?; - let n = unsafe { - libc::write(fd, buf.as_ptr() as *const _, buf.len()) - }; - if n < 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(n as usize) - } -} - -/// Wake a parked actor. If the actor isn't parked (already runnable or done) -/// this is a no-op — that's important; channel and join can both fire -/// spurious unparks under some orderings and we want them to be cheap. -/// Also a no-op if the scheduler isn't running (covers channel-sender drop -/// during runtime teardown). -pub fn unpark(pid: Pid) { - try_with_sched(|s| { - if let Some(slot) = s.slot_mut(pid) { - if matches!(slot.state, State::Parked) { - slot.state = State::Runnable; - s.run_queue.push_back(pid); - } - } - }); -} - -/// What an actor wants the scheduler to do when control returns from it. -#[derive(Copy, Clone)] -enum YieldIntent { - /// Re-queue (yield_now or preemption). - Yield, - /// Remove from the run queue (waiting for unpark). - Park, -} - -thread_local! { - static YIELD_INTENT: std::cell::Cell = const { std::cell::Cell::new(YieldIntent::Yield) }; + let n = unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) }; + if n < 0 { Err(std::io::Error::last_os_error()) } else { Ok(n as usize) } } // --------------------------------------------------------------------------- -// Supervisor channel registration +// register_supervisor_channel // --------------------------------------------------------------------------- -/// Register `sender` as the mailbox for signals about children supervised -/// by `pid`. Idempotent; later calls overwrite. pub fn register_supervisor_channel(pid: Pid, sender: Sender) { - with_sched(|s| { + with_runtime(|inner| inner.with_shared(|s| { if let Some(slot) = s.slot_mut(pid) { slot.supervisor_channel = Some(sender); } else { panic!("register_supervisor_channel: pid {:?} not found", pid); } - }); + })); } // --------------------------------------------------------------------------- -// run() — the runtime entry point +// Legacy run() — convenience wrapper // --------------------------------------------------------------------------- -/// Boot the runtime, spawn `initial` as a child of the root supervisor, -/// drive the scheduler until the run queue is empty, tear down. -/// -/// The root supervisor is a *sentinel* PID, not a real actor. Signals -/// addressed to it are dropped on the floor — that's what "process exits" -/// means in the spec when nothing escalates further. User code that wants -/// real supervision spawns its own supervisor actor and uses `spawn_under`. -pub fn run(initial: F) { - SCHED.with(|c| { - assert!(c.borrow().is_none(), "smarm::run() called recursively"); - let mut state = SchedulerState::new(); - state.root_pid = Some(ROOT_PID); - state.io = Some(crate::io::IoThread::start().expect("failed to start io thread")); - *c.borrow_mut() = Some(state); - }); - - let initial_handle = spawn(initial); - - schedule_loop(); - - // Drop the handle BEFORE the scheduler is torn down — its Drop impl - // calls `with_sched` to decrement the outstanding-handle count. - drop(initial_handle); - - // Take the SchedulerState out of the thread-local BEFORE dropping it. - // Dropping it while still inside SCHED.with's RefCell borrow would - // re-enter (via channel senders' Drop → unpark → try_with_sched). - let state = SCHED.with(|c| c.borrow_mut().take()); - drop(state); - PENDING_CLOSURES.with(|c| c.borrow_mut().clear()); +/// Single-threaded runtime entry point (backwards-compatible wrapper). +/// Equivalent to `runtime::init(Config::exact(1)).run(f)`. +pub fn run(f: F) { + crate::runtime::init(crate::runtime::Config::exact(1)).run(f); } -/// Reserved sentinel pid for the root supervisor. Never allocated to a -/// real actor; lookups return `None`; signals are dropped. -pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX); -fn schedule_loop() { - loop { - // 1. Drain due timers and dispatch by reason. - // - // Sleep — unpark the actor (idempotently: only if still - // parked). - // WaitTimeout — call the target's on_timeout. The target decides - // whether the wait was still in progress (timer - // won the race) or had been fulfilled (the thing - // the actor was waiting for arrived first → no-op). - // The target is responsible for calling unpark() - // if appropriate. - let now = std::time::Instant::now(); - let due = with_sched(|s| s.timers.pop_due(now)); - for entry in due { - match entry.reason { - crate::timer::Reason::Sleep => { - with_sched(|s| { - if let Some(slot) = s.slot_mut(entry.pid) { - if matches!(slot.state, State::Parked) { - slot.state = State::Runnable; - s.run_queue.push_back(entry.pid); - } - } - }); - } - crate::timer::Reason::WaitTimeout { target, wait_seq } => { - // Note: the target callback runs *outside* with_sched. - // It may call back into the scheduler (e.g. unpark), so - // we must not hold the SCHED borrow across it. - target.on_timeout(entry.pid, wait_seq); - } - } - } - // 2. Drain IO completions: route each result by variant. - // - // Blocking — a `block_on_io` closure finished. Stash the result - // on the actor's slot and unpark. - // FdReady — an fd registered via `wait_readable`/`wait_writable` - // is ready. Look up the parked pid in the io thread's - // waiters map, deregister the fd, unpark. - // - // Drain even when we have other runnables — it's cheap and keeps - // `pending_io_result` / `waiters` freshness bounded. - let completions = with_sched(|s| { - s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default() - }); - for completion in completions { - match completion { - crate::io::Completion::Blocking { pid, result } => { - with_sched(|s| { - if let Some(io) = s.io.as_mut() { - io.outstanding = io.outstanding.saturating_sub(1); - } - if let Some(slot) = s.slot_mut(pid) { - slot.pending_io_result = Some(result); - if matches!(slot.state, State::Parked) { - slot.state = State::Runnable; - s.run_queue.push_back(pid); - } - } - }); - } - crate::io::Completion::FdReady { fd, events: _ } => { - with_sched(|s| { - let parked_pid = s.io.as_mut() - .and_then(|io| { - let pid = io.waiters.remove(&fd); - // Deregister the fd from epollfd; the - // EPOLLONESHOT already disarmed it but the - // slot is still occupied until we DEL. - io.epoll_deregister(fd); - pid - }); - if let Some(pid) = parked_pid { - if let Some(slot) = s.slot_mut(pid) { - if matches!(slot.state, State::Parked) { - slot.state = State::Runnable; - s.run_queue.push_back(pid); - } - } - // else: actor died between registering and the - // fd firing. Nothing to do; the registration - // has been cleaned up. - } - // else: fd not in waiters — probably a duplicate - // FdReady from a previous registration, ignore. - }); - } - } - } - - // 3. Pop a runnable actor. If none, decide whether to block on - // the wake pipe (for timers or IO) or exit (nothing pending). - let pid = match with_sched(|s| s.run_queue.pop_front()) { - Some(p) => p, - None => { - // Read out what we'd need to block on. We must take the - // wake fd separately because we can't hold an SCHED - // borrow across `poll_wake` — the IO thread will be - // trying to take the completions mutex, which is fine, - // but the scheduler thread itself mustn't hold SCHED - // borrowed across a blocking syscall. - // - // "Outstanding" here means *anything* the IO thread is - // expected to deliver a wakeup for: in-flight blocking - // calls AND parked fd waiters. If either is non-zero we - // must wait for the IO thread, not exit. - let (next_deadline, io_outstanding, wake_fd) = with_sched(|s| { - let next = s.timers.peek_deadline(); - let (out, fd) = match s.io.as_ref() { - Some(io) => ( - io.outstanding + io.waiters.len() as u32, - Some(io.wake_fd()), - ), - None => (0, None), - }; - (next, out, fd) - }); - - match (next_deadline, io_outstanding, wake_fd) { - // Nothing pending — we're done. - (None, 0, _) | (None, _, None) => return, - // Timer pending, nothing else: poll with a deadline, - // or fall back to plain sleep if we somehow have no - // wake fd (shouldn't happen — io thread is always up - // during run()). - (Some(deadline), _, fd_opt) => { - let now = std::time::Instant::now(); - if deadline > now { - let timeout = deadline - now; - match fd_opt { - Some(fd) => { - crate::io::poll_wake(fd, Some(timeout)); - crate::io::drain_wake_pipe(fd); - } - None => std::thread::sleep(timeout), - } - } - continue; - } - // No timer, but IO outstanding: poll forever for the - // pipe wakeup. - (None, _, Some(fd)) => { - crate::io::poll_wake(fd, None); - crate::io::drain_wake_pipe(fd); - continue; - } - } - } - }; - - // Look up sp; skip stale or already-reaped pids. - let sp = match with_sched(|s| { - s.slot(pid).and_then(|slot| slot.actor.as_ref().map(|a| a.sp)) - }) { - Some(sp) => sp, - None => continue, - }; - - // If this is a first resume, move the pending closure to the - // thread-local the trampoline reads. - if let Some(b) = pop_pending_closure(pid) { - set_current_actor_box(b); - } - - set_actor_sp(sp); - set_current_pid(pid); - reset_actor_done(); - YIELD_INTENT.with(|c| c.set(YieldIntent::Yield)); - - crate::preempt::reset_timeslice(); - PREEMPTION_ENABLED.with(|c| c.set(true)); - - unsafe { switch_to_actor() }; - - PREEMPTION_ENABLED.with(|c| c.set(false)); - clear_current_pid(); - - let intent = YIELD_INTENT.with(|c| c.get()); - let new_sp = get_actor_sp(); - - if is_actor_done() { - let outcome = take_last_outcome().unwrap_or(Outcome::Exit); - finalize_actor(pid, outcome); - } else { - with_sched(|s| { - if let Some(slot) = s.slot_mut(pid) { - if let Some(actor) = slot.actor.as_mut() { - actor.sp = new_sp; - } - match intent { - YieldIntent::Yield => { - slot.state = State::Runnable; - s.run_queue.push_back(pid); - } - YieldIntent::Park => { - slot.state = State::Parked; - } - } - } - }); - } - } -} - -fn finalize_actor(pid: Pid, outcome: Outcome) { - // Joiners get the typed Result with the panic payload. The supervisor - // gets an informational `Signal::Panic` with an empty payload — its job - // is policy (restart/escalate), not forensics. Users who need the - // payload in supervision can plumb their own channel. - - let (joiner_outcome, sup_signal) = match outcome { - Outcome::Exit => (Outcome::Exit, Signal::Exit(pid)), - Outcome::Panic(payload) => ( - Outcome::Panic(payload), - Signal::Panic(pid, Box::new(()) as Box), - ), - }; - - // Stash outcome, mark Done, collect waiters, drop the actor stack. - let (waiters, supervisor_pid) = with_sched(|s| { - let slot = s.slot_mut(pid).expect("finalize_actor: slot vanished"); - let sup = slot.actor.as_ref().map(|a| a.supervisor); - slot.outcome = Some(joiner_outcome); - slot.state = State::Done; - slot.actor = None; - let w = std::mem::take(&mut slot.waiters); - (w, sup) - }); - - // Deliver to supervisor (best-effort; ignore SendError). - if let Some(sup) = supervisor_pid { - let sender = with_sched(|s| { - s.slot(sup).and_then(|slot| slot.supervisor_channel.clone()) - }); - if let Some(sender) = sender { - let _ = sender.send(sup_signal); - } - } - - // Unpark joiners. - for joiner in waiters { - unpark(joiner); - } - - // Reclaim if no outstanding handles. - with_sched(|s| { - let should_reclaim = match s.slot(pid) { - Some(slot) => slot.outstanding_handles == 0, - None => false, - }; - if should_reclaim { - reclaim_slot(s, pid); - } - }); -} diff --git a/src/timer.rs b/src/timer.rs index 025fa42..963a783 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -28,7 +28,7 @@ use crate::pid::Pid; use std::cmp::Reverse; use std::collections::BinaryHeap; -use std::rc::Rc; +use std::sync::Arc; use std::time::{Duration, Instant}; /// What to do when a timer entry's deadline arrives. @@ -45,7 +45,7 @@ pub enum Reason { /// target tell apart "this wait" from "a later wait by the same actor /// on the same target". WaitTimeout { - target: Rc, + target: Arc, wait_seq: u64, }, } diff --git a/tests/mutex.rs b/tests/mutex.rs index 92a0216..6647c22 100644 --- a/tests/mutex.rs +++ b/tests/mutex.rs @@ -18,11 +18,11 @@ fn lock_free_mutex_succeeds() { run(move || { let m = Mutex::new(42u32); { - let g = m.lock().unwrap(); + let g = m.lock_timeout(Duration::from_millis(500)).unwrap(); c.store(*g, Ordering::SeqCst); } // After drop we can lock again. - let g2 = m.lock().unwrap(); + let g2 = m.lock_timeout(Duration::from_millis(500)).unwrap(); assert_eq!(*g2, 42); }); assert_eq!(captured.load(Ordering::SeqCst), 42); @@ -53,10 +53,10 @@ fn guard_mutates_value_visible_through_next_lock() { run(move || { let m = Mutex::new(0u32); { - let mut g = m.lock().unwrap(); + let mut g = m.lock_timeout(Duration::from_millis(500)).unwrap(); *g = 7; } - let g2 = m.lock().unwrap(); + let g2 = m.lock_timeout(Duration::from_millis(500)).unwrap(); f.store(*g2, Ordering::SeqCst); }); assert_eq!(final_value.load(Ordering::SeqCst), 7); @@ -80,19 +80,22 @@ fn contended_lock_parks_until_holder_releases() { let m_b = m.clone(); let a = spawn(move || { - let g = m_a.lock().unwrap(); + let g = m_a.lock_timeout(Duration::from_millis(500)).unwrap(); la.lock().unwrap().push("A_locked"); - // While holding, yield to let B run. + // First yield: lets B run past its first yield_now. + yield_now(); + // Second yield: lets B reach B_try and attempt lock() while we + // still hold it, so B parks on the mutex. yield_now(); la.lock().unwrap().push("A_dropping"); drop(g); la.lock().unwrap().push("A_dropped"); }); let b = spawn(move || { - // Wait a moment to make sure A locks first. + // One yield: lets A run and acquire the lock first. yield_now(); lb.lock().unwrap().push("B_try"); - let _g = m_b.lock().unwrap(); + let _g = m_b.lock_timeout(Duration::from_millis(500)).unwrap(); lb.lock().unwrap().push("B_locked"); }); a.join().unwrap(); @@ -127,7 +130,7 @@ fn lock_timeout_returns_err_when_holder_never_releases() { let a = spawn(move || { // Hold the lock for 100ms, blocking B's attempt with a 20ms timeout. - let _g = m_a.lock().unwrap(); + let _g = m_a.lock_timeout(Duration::from_millis(500)).unwrap(); smarm::sleep(Duration::from_millis(100)); // _g drops here. }); @@ -175,7 +178,7 @@ fn waiters_are_granted_the_lock_in_fifo_order() { // releases. Each waiter records its arrival order on acquisition. let m_holder = m.clone(); let holder = spawn(move || { - let g = m_holder.lock().unwrap(); + let g = m_holder.lock_timeout(Duration::from_millis(500)).unwrap(); // Let waiters pile up. for _ in 0..5 { yield_now(); @@ -194,7 +197,7 @@ fn waiters_are_granted_the_lock_in_fifo_order() { for _ in 0..id { yield_now(); } - let _g = m_w.lock().unwrap(); + let _g = m_w.lock_timeout(Duration::from_millis(500)).unwrap(); o.lock().unwrap().push(id); })); } @@ -224,7 +227,7 @@ fn grant_wins_when_holder_releases_before_timeout() { let m_b = m.clone(); let a = spawn(move || { - let _g = m_a.lock().unwrap(); + let _g = m_a.lock_timeout(Duration::from_millis(500)).unwrap(); // Hold for 10ms, well under B's 100ms timeout. smarm::sleep(Duration::from_millis(10)); }); @@ -257,7 +260,7 @@ fn next_waiter_gets_lock_after_holder_panics() { let m_b = m.clone(); let a = spawn(move || { - let _g = m_a.lock().unwrap(); + let _g = m_a.lock_timeout(Duration::from_millis(500)).unwrap(); yield_now(); panic!("holder dies mid-critical-section"); }); @@ -295,7 +298,7 @@ fn many_actors_increment_shared_counter_via_mutex() { let m_i = m.clone(); handles.push(spawn(move || { for _ in 0..PER_ACTOR { - let mut g = m_i.lock().unwrap(); + let mut g = m_i.lock_timeout(Duration::from_millis(500)).unwrap(); *g += 1; } })); @@ -303,7 +306,7 @@ fn many_actors_increment_shared_counter_via_mutex() { for h in handles { h.join().unwrap(); } - let g = m.lock().unwrap(); + let g = m.lock_timeout(Duration::from_millis(500)).unwrap(); fv.store(*g, Ordering::SeqCst); }); diff --git a/tests/runtime.rs b/tests/runtime.rs new file mode 100644 index 0000000..e4c7b32 --- /dev/null +++ b/tests/runtime.rs @@ -0,0 +1,426 @@ +//! Tests for the multi-scheduler runtime: Config, Runtime::run, and +//! correctness under genuine parallelism. +//! +//! The single-threaded correctness properties (channel ordering, mutex +//! fairness, timer accuracy, etc.) are already covered by the per-module +//! tests. This file focuses on what changes when N > 1 scheduler threads +//! are involved: +//! +//! - Config construction and validation +//! - Runtime::run blocks until all actors finish +//! - All existing cooperative behaviours hold under multi-threading +//! - Actors genuinely run on different OS threads +//! - No lost wakeups under concurrent park/unpark +//! - No slot leaks under high spawn/join churn +//! - Panic on one scheduler thread doesn't kill others + +use smarm::{channel, runtime::{Config, Runtime}, spawn, yield_now, JoinHandle}; +use std::sync::{ + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + Arc, Barrier, +}; +use std::time::Duration; +use std::collections::HashSet; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Build a runtime with exactly `n` scheduler threads. +fn rt(n: usize) -> Runtime { + smarm::runtime::init(Config::exact(n)) +} + +/// Convenient single-threaded runtime (regression guard). +fn rt1() -> Runtime { rt(1) } + +/// Multi-threaded runtime using all available parallelism. +fn rt_par() -> Runtime { + smarm::runtime::init(Config::default()) +} + +// --------------------------------------------------------------------------- +// Config +// --------------------------------------------------------------------------- + +#[test] +fn config_exact_overrides_bounds() { + let c = Config::exact(3); + assert_eq!(c.resolved_thread_count(), 3); +} + +#[test] +fn config_default_clamps_to_available_parallelism() { + let c = Config::default(); + let n = c.resolved_thread_count(); + let avail = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + // Default min is 1, default max is available_parallelism. + assert!(n >= 1 && n <= avail); +} + +#[test] +fn config_min_max_clamps() { + // Force a range that excludes exact: min=2, max=4, available might be >4. + let c = Config::new(2, 4, None); + let n = c.resolved_thread_count(); + assert!(n >= 2 && n <= 4, "expected 2..=4, got {n}"); +} + +#[test] +fn config_min_1_max_1_is_single_threaded() { + let c = Config::new(1, 1, None); + assert_eq!(c.resolved_thread_count(), 1); +} + +// --------------------------------------------------------------------------- +// Runtime::run — basic lifecycle +// --------------------------------------------------------------------------- + +#[test] +fn runtime_run_executes_closure() { + let flag = Arc::new(AtomicBool::new(false)); + let f = flag.clone(); + rt(1).run(move || { f.store(true, Ordering::SeqCst); }); + assert!(flag.load(Ordering::SeqCst)); +} + +#[test] +fn runtime_run_blocks_until_all_actors_done() { + // Spawn a chain of actors; the counter should be exactly N when run returns. + let counter = Arc::new(AtomicU64::new(0)); + let c = counter.clone(); + rt(2).run(move || { + let mut handles = Vec::new(); + for _ in 0..20 { + let cc = c.clone(); + handles.push(spawn(move || { + cc.fetch_add(1, Ordering::SeqCst); + })); + } + for h in handles { + h.join().unwrap(); + } + }); + assert_eq!(counter.load(Ordering::SeqCst), 20); +} + +#[test] +fn runtime_can_be_used_multiple_times_sequentially() { + // Each call to run() is independent. + let r = rt(2); + let a = Arc::new(AtomicU64::new(0)); + let b = Arc::new(AtomicU64::new(0)); + let ac = a.clone(); + let bc = b.clone(); + r.run(move || { ac.fetch_add(1, Ordering::SeqCst); }); + r.run(move || { bc.fetch_add(1, Ordering::SeqCst); }); + assert_eq!(a.load(Ordering::SeqCst), 1); + assert_eq!(b.load(Ordering::SeqCst), 1); +} + +// --------------------------------------------------------------------------- +// Single-threaded regression: exact(1) must behave identically to old run() +// --------------------------------------------------------------------------- + +#[test] +fn exact_1_spawn_join_works() { + let v = Arc::new(AtomicU64::new(0)); + let vc = v.clone(); + rt1().run(move || { + let h = spawn(move || { vc.store(42, Ordering::SeqCst); }); + h.join().unwrap(); + }); + assert_eq!(v.load(Ordering::SeqCst), 42); +} + +#[test] +fn exact_1_channel_recv_parks_and_wakes() { + let v = Arc::new(AtomicU64::new(0)); + let vc = v.clone(); + rt1().run(move || { + let (tx, rx) = channel::(); + let h = spawn(move || { + let val = rx.recv().unwrap(); + vc.store(val, Ordering::SeqCst); + }); + yield_now(); + tx.send(99).unwrap(); + h.join().unwrap(); + }); + assert_eq!(v.load(Ordering::SeqCst), 99); +} + +#[test] +fn exact_1_panic_captured() { + let saw_err = Arc::new(AtomicBool::new(false)); + let s = saw_err.clone(); + rt1().run(move || { + let h = spawn(|| panic!("oops")); + if h.join().is_err() { s.store(true, Ordering::SeqCst); } + }); + assert!(saw_err.load(Ordering::SeqCst)); +} + +// --------------------------------------------------------------------------- +// Multi-threaded correctness +// --------------------------------------------------------------------------- + +#[test] +fn multi_thread_all_actors_complete() { + let counter = Arc::new(AtomicU64::new(0)); + let c = counter.clone(); + rt_par().run(move || { + let mut handles = Vec::new(); + for _ in 0..100 { + let cc = c.clone(); + handles.push(spawn(move || { + cc.fetch_add(1, Ordering::SeqCst); + })); + } + for h in handles { h.join().unwrap(); } + }); + assert_eq!(counter.load(Ordering::SeqCst), 100); +} + +#[test] +fn multi_thread_channel_wakeup_across_threads() { + // Receiver parks; sender runs (potentially on a different OS thread). + // Verifies no lost wakeup. + let received = Arc::new(AtomicU64::new(0)); + let rc = received.clone(); + rt_par().run(move || { + let (tx, rx) = channel::(); + let h = spawn(move || { + let v = rx.recv().unwrap(); + rc.store(v, Ordering::SeqCst); + }); + // Let receiver park. + yield_now(); + tx.send(7).unwrap(); + h.join().unwrap(); + }); + assert_eq!(received.load(Ordering::SeqCst), 7); +} + +#[test] +fn multi_thread_many_channels_no_lost_wakeups() { + // N pairs of (sender actor, receiver actor). Each pair exchanges one + // message. All must complete — any lost wakeup causes a deadlock/timeout. + const PAIRS: usize = 50; + let count = Arc::new(AtomicU64::new(0)); + let c = count.clone(); + rt_par().run(move || { + let mut handles: Vec = Vec::new(); + for _ in 0..PAIRS { + let (tx, rx) = channel::(); + let cc = c.clone(); + handles.push(spawn(move || { + let v = rx.recv().unwrap(); + cc.fetch_add(v, Ordering::SeqCst); + })); + handles.push(spawn(move || { + tx.send(1).unwrap(); + })); + } + for h in handles { h.join().unwrap(); } + }); + assert_eq!(count.load(Ordering::SeqCst), PAIRS as u64); +} + +#[test] +fn multi_thread_mutex_contention_no_deadlock() { + use smarm::Mutex; + const ACTORS: usize = 20; + const PER: u64 = 100; + let total = Arc::new(AtomicU64::new(0)); + let t = total.clone(); + rt_par().run(move || { + let m: Mutex = Mutex::new(0); + let mut handles = Vec::new(); + for _ in 0..ACTORS { + let mc = m.clone(); + let tc = t.clone(); + handles.push(spawn(move || { + for _ in 0..PER { + let mut g = mc.lock_timeout(Duration::from_secs(5)).unwrap(); + *g += 1; + tc.fetch_add(0, Ordering::SeqCst); // just a memory barrier + } + })); + } + for h in handles { h.join().unwrap(); } + let g = m.lock_timeout(Duration::from_secs(1)).unwrap(); + t.store(*g, Ordering::SeqCst); + }); + assert_eq!(total.load(Ordering::SeqCst), ACTORS as u64 * PER); +} + +#[test] +fn multi_thread_join_across_threads() { + // Parent joins a child that may run on a different scheduler thread. + let v = Arc::new(AtomicU64::new(0)); + let vc = v.clone(); + rt_par().run(move || { + let h = spawn(move || { + // Do some work to make scheduling interesting. + for _ in 0..10 { yield_now(); } + vc.store(1, Ordering::SeqCst); + }); + h.join().unwrap(); + }); + assert_eq!(v.load(Ordering::SeqCst), 1); +} + +// --------------------------------------------------------------------------- +// Actors run on distinct OS threads +// +// We collect the OS thread IDs that actors execute on. With N schedulers +// and enough actors, we expect to see more than one thread ID. +// --------------------------------------------------------------------------- + +#[test] +fn actors_run_on_multiple_os_threads() { + let thread_ids: Arc>> = + Arc::new(smarm::Mutex::new(HashSet::new())); + + rt_par().run({ + let ids = thread_ids.clone(); + move || { + let mut handles = Vec::new(); + for _ in 0..64 { + let idc = ids.clone(); + handles.push(spawn(move || { + let tid = unsafe { libc::syscall(libc::SYS_gettid) as u64 }; + let mut g = idc.lock_timeout(Duration::from_secs(1)).unwrap(); + g.insert(tid); + })); + } + for h in handles { h.join().unwrap(); } + } + }); + + let n = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1); + + let ids = thread_ids.lock_timeout(Duration::from_secs(1)).unwrap(); + // If we have >1 scheduler threads, we expect >1 OS thread IDs. + // On a single-CPU machine this may be 1; we just assert ≥ 1. + assert!(!ids.is_empty()); + if n > 1 { + // Strongly expect parallelism — not a hard assert since scheduling + // is non-deterministic, but 64 actors should spread. + // We log rather than assert to avoid flakiness on loaded CI. + if ids.len() == 1 { + eprintln!("WARNING: 64 actors all ran on the same OS thread (flaky on loaded system)"); + } + } +} + +// --------------------------------------------------------------------------- +// Scheduler stats (RFC 000 Layer 1 primitives) +// --------------------------------------------------------------------------- + +#[test] +fn scheduler_stats_run_queue_len_is_observable() { + // After spawning actors but before they run, the queue should be non-empty. + // We can't observe this from inside run() without a snapshot API, but we + // can verify the stats struct is accessible and returns sane values after + // run() completes (queue len == 0 at quiescence). + let r = rt_par(); + r.run(|| { + for _ in 0..10 { spawn(|| {}); } + // Don't join — let them drain naturally. + }); + let stats = r.stats(); + assert_eq!(stats.total_run_queue_len(), 0, "queue should be empty after run()"); +} + +#[test] +fn scheduler_stats_thread_count_matches_config() { + let r = rt(3); + r.run(|| {}); + assert_eq!(r.stats().scheduler_count(), 3); +} + +// --------------------------------------------------------------------------- +// Panic isolation: a panicking actor doesn't kill the scheduler thread +// --------------------------------------------------------------------------- + +#[test] +fn panic_in_actor_does_not_kill_runtime() { + let completed = Arc::new(AtomicU64::new(0)); + let c = completed.clone(); + rt_par().run(move || { + // Spawn a panicker alongside well-behaved actors. + let bad = spawn(|| panic!("deliberate")); + let mut good_handles = Vec::new(); + for _ in 0..10 { + let cc = c.clone(); + good_handles.push(spawn(move || { + cc.fetch_add(1, Ordering::SeqCst); + })); + } + let _ = bad.join(); // expect Err + for h in good_handles { h.join().unwrap(); } + }); + assert_eq!(completed.load(Ordering::SeqCst), 10); +} + +// --------------------------------------------------------------------------- +// No slot leaks: rapid spawn/join churn +// --------------------------------------------------------------------------- + +#[test] +fn no_slot_leak_under_churn() { + // Spawn and join many short actors in a loop. If slots leak, the slot + // table grows unboundedly. We can't directly measure it without an + // introspection API, but the test at least checks correctness under + // churn and will OOM if there's a severe leak. + let counter = Arc::new(AtomicU64::new(0)); + let c = counter.clone(); + rt_par().run(move || { + for _ in 0..500 { + let cc = c.clone(); + spawn(move || { cc.fetch_add(1, Ordering::SeqCst); }) + .join() + .unwrap(); + } + }); + assert_eq!(counter.load(Ordering::SeqCst), 500); +} + +// --------------------------------------------------------------------------- +// Ping-pong: channel round-trips between two actors +// --------------------------------------------------------------------------- + +#[test] +fn ping_pong_completes() { + const ROUNDS: u64 = 1_000; + let final_val = Arc::new(AtomicU64::new(0)); + let fv = final_val.clone(); + rt_par().run(move || { + let (tx_a, rx_a) = channel::(); + let (tx_b, rx_b) = channel::(); + let h_a = spawn(move || { + tx_a.send(0).unwrap(); + for _ in 0..ROUNDS { + let v = rx_b.recv().unwrap(); + tx_a.send(v + 1).unwrap(); + } + }); + let h_b = spawn(move || { + for _ in 0..=ROUNDS { + let v = rx_a.recv().unwrap(); + if v < ROUNDS { + tx_b.send(v).unwrap(); + } else { + fv.store(v, Ordering::SeqCst); + } + } + }); + h_a.join().unwrap(); + h_b.join().unwrap(); + }); + assert_eq!(final_val.load(Ordering::SeqCst), ROUNDS); +}