commit 0e9d9d7d5f7860fcdccce6334b6b4a04ca960544 Author: Claude Date: Fri May 22 05:01:51 2026 +0000 v0.1: green-thread actors, supervision, channels, benchmark Hand-rolled context switching on mmap'd stacks with guard pages, allocator-driven RDTSC preemption, unbounded MPSC channels, supervision via per-slot Signal mailboxes, root supervisor as sentinel PID. Lib + tests + benches clean check/clippy. All 29 tests pass. Bench: smarm 3.4% over serial baseline, within 160us of tokio current-thread on prime-counting fan-out. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..2028c22 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "smarm" +version = "0.1.0" +edition = "2021" +rust-version = "1.95" + +[dependencies] +libc = "0.2" + +[dev-dependencies] +tokio = { version = "1", features = ["rt", "macros", "sync"] } + +[profile.dev] +panic = "unwind" + +[profile.release] +panic = "unwind" +lto = "thin" +codegen-units = 1 + +[[bench]] +name = "primes" +harness = false diff --git a/benches/primes.rs b/benches/primes.rs new file mode 100644 index 0000000..7431e87 --- /dev/null +++ b/benches/primes.rs @@ -0,0 +1,134 @@ +//! Compute-heavy fan-out/fan-in benchmark. +//! +//! Counts primes in [2, N) across W workers (each handling a contiguous +//! slice), then sums the results. Tests pure compute throughput plus the +//! cost of spawn/join/channel. Single-threaded both sides (smarm has only +//! one OS thread; tokio is configured `current_thread`). +//! +//! Run with `cargo bench`. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +const N: u64 = 200_000; +const WORKERS: u64 = 16; +const ITERATIONS: u32 = 5; + +fn is_prime(n: u64) -> bool { + if n < 2 { return false; } + if n < 4 { return true; } + if n % 2 == 0 { return false; } + let mut i = 3u64; + while i * i <= n { + if n % i == 0 { return false; } + i += 2; + } + true +} + +fn count_primes_in(lo: u64, hi: u64) -> u64 { + let mut count = 0u64; + for n in lo..hi { + if is_prime(n) { count += 1; } + } + count +} + +fn slice(worker: u64) -> (u64, u64) { + let per = N / WORKERS; + let lo = worker * per; + let hi = if worker + 1 == WORKERS { N } else { (worker + 1) * per }; + (lo, hi) +} + +fn bench_smarm() -> (u64, u128) { + let total = Arc::new(AtomicU64::new(0)); + let total2 = total.clone(); + let start = Instant::now(); + + smarm::run(move || { + let mut handles = Vec::new(); + for w in 0..WORKERS { + let (lo, hi) = slice(w); + let t = total2.clone(); + handles.push(smarm::spawn(move || { + let c = count_primes_in(lo, hi); + t.fetch_add(c, Ordering::Relaxed); + })); + } + for h in handles { + h.join().unwrap(); + } + }); + + (total.load(Ordering::Relaxed), start.elapsed().as_micros()) +} + +fn bench_tokio() -> (u64, u128) { + let total = Arc::new(AtomicU64::new(0)); + let total2 = total.clone(); + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let start = Instant::now(); + + let local = tokio::task::LocalSet::new(); + local.block_on(&rt, async move { + let mut handles = Vec::new(); + for w in 0..WORKERS { + let (lo, hi) = slice(w); + let t = total2.clone(); + handles.push(tokio::task::spawn_local(async move { + let c = count_primes_in(lo, hi); + t.fetch_add(c, Ordering::Relaxed); + })); + } + for h in handles { + let _ = h.await; + } + }); + + (total.load(Ordering::Relaxed), start.elapsed().as_micros()) +} + +fn bench_baseline() -> (u64, u128) { + let mut total = 0u64; + let start = Instant::now(); + for w in 0..WORKERS { + let (lo, hi) = slice(w); + total += count_primes_in(lo, hi); + } + (total, start.elapsed().as_micros()) +} + +fn run_n (u64, u128)>(name: &str, n: u32, mut f: F) { + let mut times = Vec::new(); + let mut last_count = 0; + for _ in 0..n { + let (c, t) = f(); + times.push(t); + last_count = c; + } + times.sort(); + let median = times[times.len() / 2]; + let min = *times.iter().min().unwrap(); + let max = *times.iter().max().unwrap(); + println!( + "{:>12} | primes: {:>6} | median: {:>8} µs | min: {:>8} µs | max: {:>8} µs", + name, last_count, median, min, max + ); +} + +fn main() { + println!( + "Counting primes in [2, {}) across {} workers, {} iterations each\n", + N, WORKERS, ITERATIONS + ); + println!("{:>12} | {:>15} | {:>16} | {:>15} | {:>15}", "runtime", "primes found", "median", "min", "max"); + println!("{}", "-".repeat(80)); + + run_n("baseline", ITERATIONS, bench_baseline); + run_n("smarm", ITERATIONS, bench_smarm); + run_n("tokio", ITERATIONS, bench_tokio); +} diff --git a/src/actor.rs b/src/actor.rs new file mode 100644 index 0000000..58ebd04 --- /dev/null +++ b/src/actor.rs @@ -0,0 +1,110 @@ +//! Actor descriptor and trampoline. +//! +//! An `Actor` owns its stack and holds the closure it will run. The +//! `trampoline` is a fixed `extern "C-unwind" fn()` that every actor enters +//! through; it pulls the closure out of a thread-local set by the scheduler +//! immediately before resume, invokes it inside `catch_unwind`, records the +//! outcome, and switches back to the scheduler. +//! +//! Why a thread-local and not, say, passing the closure pointer via a +//! register? Because the first resume goes through `ret`, not `call`, and +//! we have no other channel for parameters. The scheduler sets the +//! thread-local, switches in, the trampoline reads it. After the first +//! resume the closure has been consumed, so subsequent resumes don't need it. + +use crate::context::switch_to_scheduler; +use crate::pid::Pid; +use crate::stack::Stack; +use std::any::Any; +use std::cell::{Cell, RefCell}; +use std::panic; + +/// What an actor produced when it finished. Stored on the actor's slot, +/// drained by `JoinHandle::join` once the slot is marked done. +pub enum Outcome { + Exit, + Panic(Box), +} + +// Thread-locals that the scheduler writes immediately before `switch_to_actor`. +thread_local! { + /// The closure for the actor we're about to resume *for the first time*. + /// Consumed on first entry into the trampoline; `None` thereafter. + static CURRENT_ACTOR_BOX: RefCell>> = + const { RefCell::new(None) }; + + /// The PID of the actor currently executing on this OS thread. + /// Set on every resume so that `self_pid()` works inside actor code. + static CURRENT_PID: Cell> = const { Cell::new(None) }; + + /// Filled by the trampoline when the actor returns (normally or via + /// panic). The scheduler reads this after `switch_to_actor` returns. + static LAST_OUTCOME: RefCell> = const { RefCell::new(None) }; + + /// Set by the trampoline on completion; reset by the scheduler before + /// each resume so it never sees stale state. + static ACTOR_DONE: Cell = const { Cell::new(false) }; +} + +pub fn set_current_actor_box(b: Box) { + CURRENT_ACTOR_BOX.with(|c| *c.borrow_mut() = Some(b)); +} + +pub fn set_current_pid(p: Pid) { + CURRENT_PID.with(|c| c.set(Some(p))); +} + +pub fn clear_current_pid() { + CURRENT_PID.with(|c| c.set(None)); +} + +pub fn current_pid() -> Option { + CURRENT_PID.with(|c| c.get()) +} + +pub fn reset_actor_done() { + ACTOR_DONE.with(|c| c.set(false)); +} + +pub fn is_actor_done() -> bool { + ACTOR_DONE.with(|c| c.get()) +} + +pub fn take_last_outcome() -> Option { + LAST_OUTCOME.with(|r| r.borrow_mut().take()) +} + +/// The function whose address is written as the `ret` target on every actor +/// stack. The compiler must not inline this away. `extern "C-unwind"` permits +/// unwinding to cross the boundary, but `catch_unwind` here means unwinding +/// never actually does. +pub extern "C-unwind" fn trampoline() { + let b = CURRENT_ACTOR_BOX.with(|c| c.borrow_mut().take()) + .expect("trampoline entered without a closure set"); + + let outcome = match panic::catch_unwind(panic::AssertUnwindSafe(b)) { + Ok(()) => Outcome::Exit, + Err(payload) => Outcome::Panic(payload), + }; + + LAST_OUTCOME.with(|r| *r.borrow_mut() = Some(outcome)); + ACTOR_DONE.with(|c| c.set(true)); + + // Hand control back. The scheduler will tear down our slot and never + // resume us again. + unsafe { switch_to_scheduler() }; + // Unreachable. If it isn't, the scheduler has a bug. + unreachable!("scheduler resumed a done actor"); +} + +/// One actor's worth of state. Owned by the scheduler's slot table. +pub struct Actor { + /// The PID this actor was assigned at spawn time. + pub pid: Pid, + /// The stack the actor runs on. Dropped (munmap'd) when the actor dies. + pub stack: Stack, + /// The saved stack pointer. Updated on every yield. + pub sp: usize, + /// The PID of this actor's supervisor. Used to deliver `Signal` on death. + pub supervisor: Pid, +} diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..e21186a --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,163 @@ +//! Unbounded MPSC channels. +//! +//! Single-threaded scheduler: the inner state is `Rc>>`, +//! not `Arc`. We hand-implement `Send` for `Sender` and +//! `Receiver` when `T: Send`, on the basis that the only way two actor +//! contexts touch the same channel is by being scheduled on the *same* OS +//! thread (v0.1 has exactly one). When we add a second scheduler thread, +//! this lie must be retired: replace `Rc` with `Arc` (or a +//! lock-free queue) and remove the unsafe Send impls. +//! +//! Semantics: +//! - Senders are clonable; the last sender drop closes the channel. +//! - `Receiver::recv` on an empty open channel parks the receiver. +//! - `Receiver::recv` on an empty closed channel returns `Err(RecvError)`. +//! - `Sender::send` on an open channel always succeeds. +//! - `Sender::send` on a closed channel (receiver dropped) returns +//! `Err(SendError(value))`. +//! - When a send pushes to a previously empty queue and a receiver is +//! parked, the receiver is unparked. + +use crate::pid::Pid; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::rc::Rc; + +pub fn channel() -> (Sender, Receiver) { + let inner = Rc::new(RefCell::new(Inner { + queue: VecDeque::new(), + parked_receiver: None, + senders: 1, + receiver_alive: true, + })); + (Sender { inner: inner.clone() }, Receiver { inner }) +} + +struct Inner { + queue: VecDeque, + parked_receiver: Option, + senders: usize, + receiver_alive: bool, +} + +pub struct Sender { + inner: Rc>>, +} + +pub struct Receiver { + inner: Rc>>, +} + +// SAFETY (v0.1 only): the scheduler is single-threaded. Sender/Receiver can +// be captured into actor closures (which require Send), but they will only +// ever be touched from one OS thread. When multi-threading lands, swap the +// `Rc` for `Arc` and remove these. +unsafe impl Send for Sender {} +unsafe impl Send for Receiver {} + +#[derive(Debug, PartialEq, Eq)] +pub struct SendError(pub T); + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub struct RecvError; + +impl std::fmt::Display for RecvError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "channel closed") + } +} + +impl std::error::Error for RecvError {} + +impl Clone for Sender { + fn clone(&self) -> Self { + self.inner.borrow_mut().senders += 1; + Sender { inner: self.inner.clone() } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + let unpark = { + let mut g = self.inner.borrow_mut(); + g.senders -= 1; + if g.senders == 0 && g.queue.is_empty() { + // Channel closed and drained. Wake the receiver so it can + // see RecvError. + g.parked_receiver.take() + } else { + None + } + }; + if let Some(pid) = unpark { + crate::scheduler::unpark(pid); + } + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + self.inner.borrow_mut().receiver_alive = false; + } +} + +impl Sender { + pub fn send(&self, value: T) -> Result<(), SendError> { + let unpark = { + let mut g = self.inner.borrow_mut(); + if !g.receiver_alive { + return Err(SendError(value)); + } + g.queue.push_back(value); + // If the receiver is parked, unpark it. + g.parked_receiver.take() + }; + if let Some(pid) = unpark { + crate::scheduler::unpark(pid); + } + Ok(()) + } +} + +impl Receiver { + pub fn recv(&self) -> Result { + loop { + // Try to take a message. + { + let mut g = self.inner.borrow_mut(); + if let Some(v) = g.queue.pop_front() { + return Ok(v); + } + if g.senders == 0 { + return Err(RecvError); + } + // Empty + open: register and park. + let me = crate::actor::current_pid() + .expect("recv() called outside an actor"); + debug_assert!( + g.parked_receiver.is_none(), + "channel has more than one receiver" + ); + g.parked_receiver = Some(me); + } + // Release the borrow before parking — the unparker will need it. + crate::scheduler::park_current(); + // Loop: the message that woke us might already have been taken + // (it can't, with one receiver, but the senders=0 path can fire + // here too). + } + } + + /// Non-blocking. `Ok(Some(v))` if a message was available, `Ok(None)` if + /// the channel is empty but open, `Err(RecvError)` if closed and drained. + pub fn try_recv(&self) -> Result, RecvError> { + let mut g = self.inner.borrow_mut(); + if let Some(v) = g.queue.pop_front() { + return Ok(Some(v)); + } + if g.senders == 0 { + return Err(RecvError); + } + Ok(None) + } +} diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..75c2840 --- /dev/null +++ b/src/context.rs @@ -0,0 +1,106 @@ +//! Cooperative context switching, x86-64. +//! +//! Two naked-asm functions move execution between a scheduler thread and an +//! actor running on its own mmap'd stack. The compiler cannot do this; the +//! whole point of `#[unsafe(naked)]` is that we control every instruction. +//! +//! `SCHEDULER_SP` and `ACTOR_SP` are thread-locals holding each side's saved +//! stack pointer. `init_actor_stack` builds the initial stack so that the +//! first `switch_to_actor` lands inside the entry function with `rsp % 16 == 8` +//! (the x86-64 ABI requirement at function entry). + +use std::cell::Cell; + +thread_local! { + static SCHEDULER_SP: Cell = const { Cell::new(0) }; + static ACTOR_SP: Cell = const { Cell::new(0) }; +} + +fn get_scheduler_sp() -> usize { SCHEDULER_SP.with(|c| c.get()) } +fn set_scheduler_sp(v: usize) { SCHEDULER_SP.with(|c| c.set(v)) } +pub fn get_actor_sp() -> usize { ACTOR_SP.with(|c| c.get()) } +pub fn set_actor_sp(v: usize) { ACTOR_SP.with(|c| c.set(v)) } + +// --------------------------------------------------------------------------- +// Initial stack layout +// +// After alignment, sp = top & ~15 - 8. Then we push (downward) six callee- +// saved register slots and a return address. The first `switch_to_actor` +// pops r15..rbx and `ret`s — landing in `entry` with rsp % 16 == 8. +// +// Layout (high → low), relative to aligned_top = top & ~15: +// aligned_top - 8 : entry ptr ← `ret` target. Post-ret: rsp % 16 == 8. +// aligned_top - 16 : rbx = 0 +// aligned_top - 24 : rbp = 0 +// aligned_top - 32 : r12 = 0 +// aligned_top - 40 : r13 = 0 +// aligned_top - 48 : r14 = 0 +// aligned_top - 56 : r15 = 0 ← initial rsp +// --------------------------------------------------------------------------- + +pub fn init_actor_stack(top: *mut u8, entry: extern "C-unwind" fn()) -> usize { + unsafe { + let mut sp = (top as usize & !15) - 8; + sp -= 8; (sp as *mut usize).write(entry as usize); // ret target + sp -= 8; (sp as *mut usize).write(0); // rbx + sp -= 8; (sp as *mut usize).write(0); // rbp + sp -= 8; (sp as *mut usize).write(0); // r12 + sp -= 8; (sp as *mut usize).write(0); // r13 + sp -= 8; (sp as *mut usize).write(0); // r14 + sp -= 8; (sp as *mut usize).write(0); // r15 + sp + } +} + +// --------------------------------------------------------------------------- +// Context switch shims +// +// Each shim: +// 1. Pushes the six callee-saved integer registers. +// 2. Snaps rsp into rdi and calls the Rust helper that stores it. +// 3. Calls the Rust helper that returns the *other* side's saved rsp. +// 4. Moves that into rsp. +// 5. Pops the six registers and rets. +// +// XMM registers are NOT saved here. We rely on every yield happening through +// a Rust call site, which means the compiler has spilled any live XMM state +// to the stack before we get here. (This is the same argument the compiler +// uses internally — callee-saved regs are what survive a `call`, and the +// SysV AMD64 ABI says XMM0–15 are all caller-saved.) If we ever yield from +// a place that isn't a Rust call boundary, this assumption breaks. +// --------------------------------------------------------------------------- + +#[unsafe(naked)] +unsafe extern "C" fn switch_to_actor_asm() { + core::arch::naked_asm!( + "push rbx", "push rbp", "push r12", "push r13", "push r14", "push r15", + "mov rdi, rsp", + "call {set_sched_sp}", + "call {get_actor_sp}", + "mov rsp, rax", + "pop r15", "pop r14", "pop r13", "pop r12", "pop rbp", "pop rbx", + "ret", + set_sched_sp = sym set_scheduler_sp, + get_actor_sp = sym get_actor_sp, + ); +} + +/// Resume the actor whose sp is in `ACTOR_SP`. Returns when the actor yields. +pub unsafe fn switch_to_actor() { + unsafe { switch_to_actor_asm() }; +} + +#[unsafe(naked)] +pub unsafe extern "C" fn switch_to_scheduler() { + core::arch::naked_asm!( + "push rbx", "push rbp", "push r12", "push r13", "push r14", "push r15", + "mov rdi, rsp", + "call {set_actor_sp}", + "call {get_sched_sp}", + "mov rsp, rax", + "pop r15", "pop r14", "pop r13", "pop r12", "pop rbp", "pop rbx", + "ret", + set_actor_sp = sym set_actor_sp, + get_sched_sp = sym get_scheduler_sp, + ); +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..5ec7bf8 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,40 @@ +//! # smarm — Silly Marks Abstract Rust Machine +//! +//! Erlang-style green-thread actor concurrency for Rust. +//! +//! v0.1 is single-threaded. One scheduler, one OS thread. The scheduler +//! cooperatively interleaves green-thread actors with hand-rolled context +//! switches. Actors communicate by sending `Send` messages over channels; +//! every actor has a supervisor, which is itself just an actor with a +//! `Receiver`. +//! +//! See `LOOM.md` for the design intent and the deferred-for-later list. + +pub mod stack; +pub mod context; +pub mod preempt; +pub mod pid; +pub mod actor; +pub mod channel; +pub mod scheduler; +pub mod supervisor; + +// --------------------------------------------------------------------------- +// Global allocator +// +// The preempting allocator wraps `System`. While `PREEMPTION_ENABLED` is +// false (the default outside an actor) it adds one branch per allocation +// and no syscalls. The scheduler flips it on per-resume. +// --------------------------------------------------------------------------- + +#[global_allocator] +static ALLOCATOR: preempt::PreemptingAllocator = preempt::PreemptingAllocator; + +// --------------------------------------------------------------------------- +// Public API re-exports +// --------------------------------------------------------------------------- + +pub use channel::{channel, Receiver, RecvError, Sender}; +pub use pid::Pid; +pub use scheduler::{run, self_pid, spawn, spawn_under, yield_now, JoinError, JoinHandle}; +pub use supervisor::Signal; diff --git a/src/pid.rs b/src/pid.rs new file mode 100644 index 0000000..2c1a7d9 --- /dev/null +++ b/src/pid.rs @@ -0,0 +1,38 @@ +//! Process identifiers. +//! +//! A `Pid` is `(index, generation)`. The index is a slot in the scheduler's +//! actor table; the generation increments every time that slot is reused. +//! A stale `Pid` (correct index, wrong generation) is a detectable error, +//! not a silent misdirection — solves the ABA problem without exhausting +//! the PID space. + +#[derive(Copy, Clone, PartialEq, Eq, Hash)] +pub struct Pid { + index: u32, + generation: u32, +} + +impl Pid { + #[inline] + pub const fn new(index: u32, generation: u32) -> Self { + Self { index, generation } + } + + #[inline] + pub const fn index(self) -> u32 { self.index } + + #[inline] + pub const fn generation(self) -> u32 { self.generation } +} + +impl std::fmt::Debug for Pid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Pid({}.{})", self.index, self.generation) + } +} + +impl std::fmt::Display for Pid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "<{}.{}>", self.index, self.generation) + } +} diff --git a/src/preempt.rs b/src/preempt.rs new file mode 100644 index 0000000..6adb9dd --- /dev/null +++ b/src/preempt.rs @@ -0,0 +1,104 @@ +//! Allocator-driven preemption. +//! +//! A `GlobalAlloc` wrapper counts allocations. Every `ALLOC_INTERVAL`-th +//! allocation it reads RDTSC and, if the actor's timeslice has expired, +//! calls `switch_to_scheduler` to yield. +//! +//! All state is thread-local. The scheduler enables preemption on resume +//! and disables it on the return path, so the scheduler can never preempt +//! itself. +//! +//! TSC frequency is machine-dependent; `TIMESLICE_CYCLES` is a constant +//! calibrated for ~100µs on a 3 GHz CPU. A real implementation would +//! measure it at startup. For v0.1 the constant suffices. + +use std::alloc::{GlobalAlloc, Layout, System}; +use std::cell::Cell; + +const ALLOC_INTERVAL: u32 = 128; +const TIMESLICE_CYCLES: u64 = 300_000; // ≈ 100µs on a 3 GHz CPU + +thread_local! { + /// While `false`, the allocator hook is a no-op. + pub static PREEMPTION_ENABLED: Cell = const { Cell::new(false) }; + + /// Countdown to next RDTSC check. Reset to `ALLOC_INTERVAL` on resume. + static ALLOC_COUNT: Cell = const { Cell::new(ALLOC_INTERVAL) }; + + /// RDTSC value written by the scheduler on every actor resume. + static TIMESLICE_START: Cell = const { Cell::new(0) }; +} + +/// Arm the timeslice. Called by the scheduler on every resume. +pub fn reset_timeslice() { + ALLOC_COUNT.with(|c| c.set(ALLOC_INTERVAL)); + TIMESLICE_START.with(|c| c.set(rdtsc())); +} + +#[inline(always)] +pub fn rdtsc() -> u64 { + unsafe { + // SAFETY: x86-64 only. `lfence` serialises the instruction stream so + // we don't measure time before prior instructions retire. + core::arch::asm!("lfence", options(nostack, nomem, preserves_flags)); + core::arch::x86_64::_rdtsc() + } +} + +pub struct PreemptingAllocator; + +unsafe impl GlobalAlloc for PreemptingAllocator { + #[inline] + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + maybe_preempt(); + unsafe { System.alloc(layout) } + } + + #[inline] + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + unsafe { System.dealloc(ptr, layout) } + } + + #[inline] + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + maybe_preempt(); + unsafe { System.alloc_zeroed(layout) } + } + + #[inline] + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + maybe_preempt(); + unsafe { System.realloc(ptr, layout, new_size) } + } +} + +#[inline(always)] +fn maybe_preempt() { + ALLOC_COUNT.with(|c| { + let n = c.get(); + if n == 0 { + c.set(ALLOC_INTERVAL); + if PREEMPTION_ENABLED.with(|e| e.get()) { + let start = TIMESLICE_START.with(|s| s.get()); + if rdtsc().saturating_sub(start) > TIMESLICE_CYCLES { + // SAFETY: reachable only inside an actor (the scheduler + // sets PREEMPTION_ENABLED on resume and clears it on + // return). The scheduler stack is therefore valid. + unsafe { crate::context::switch_to_scheduler() }; + } + } + } else { + c.set(n - 1); + } + }); +} + +// --------------------------------------------------------------------------- +// Test helpers +// --------------------------------------------------------------------------- + +/// Force-expire the timeslice so the next RDTSC check preempts. +pub fn expire_timeslice_for_test() { + TIMESLICE_START.with(|c| c.set(0)); + ALLOC_COUNT.with(|c| c.set(0)); +} diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..2879f01 --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,529 @@ +//! The single-threaded scheduler. +//! +//! There is one global scheduler per OS thread, stored in a thread-local. +//! `run(initial)` initialises it, spawns the initial actor, drives the loop +//! until the run queue is empty, then tears it down. +//! +//! Slot table: a `Vec` indexed by `Pid::index()`, with a free list of +//! reusable indices. Each slot has a `generation` counter that increments +//! every time the slot is freed; `Pid` carries the generation it was minted +//! with, so a stale PID has a mismatching generation and is detected on +//! lookup. +//! +//! Run queue: a `VecDeque` of runnable actors. The state of an actor +//! is implicit in slot.state: `Runnable` means it's either in the queue or +//! currently executing; `Parked` means it's waiting for something to unpark +//! it (channel send, join completion, …); `Done` means it has finished and +//! is awaiting reaping. +//! +//! Joining: `JoinHandle::join()` parks the calling actor and registers it +//! on the target slot's `waiters` list. When the target actor finishes, +//! the scheduler reaps the slot and unparks every waiter, passing them the +//! outcome via a side channel (the target's `outcome` field, drained on +//! the joiner side). + +use crate::actor::{ + clear_current_pid, current_pid, is_actor_done, reset_actor_done, + set_current_actor_box, set_current_pid, take_last_outcome, trampoline, Actor, Outcome, +}; +use crate::channel::Sender; +use crate::context::{get_actor_sp, init_actor_stack, set_actor_sp, switch_to_actor}; +use crate::pid::Pid; +use crate::preempt::PREEMPTION_ENABLED; +use crate::stack::Stack; +use crate::supervisor::Signal; +use std::cell::RefCell; +use std::collections::VecDeque; + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +const ACTOR_STACK_SIZE: usize = 64 * 1024; + +// --------------------------------------------------------------------------- +// Per-actor slot +// --------------------------------------------------------------------------- + +enum State { + /// Either in the run queue or currently executing. + Runnable, + /// Removed from the queue, waiting for `unpark()`. + Parked, + /// The actor has finished. Slot persists until the last `JoinHandle` + /// has been joined (or dropped). Then the slot is freed. + Done, +} + +struct Slot { + /// Bumped every time this slot is freed and re-used. A `Pid` with a + /// non-matching generation is stale. + generation: u32, + /// `None` when the slot is free. `Some` otherwise. + actor: Option, + state: State, + /// PIDs waiting in `JoinHandle::join`. + waiters: Vec, + /// The outcome the actor produced, captured when it finished. + /// Drained by `JoinHandle::join`. + outcome: Option, + /// If this slot is a supervisor, the sender into its `Signal` mailbox. + /// Cloned out and used when one of its children dies. + supervisor_channel: Option>, + /// Number of `JoinHandle`s still outstanding for this actor. The slot + /// is reclaimed only when the actor is done AND outstanding_handles == 0. + outstanding_handles: u32, +} + +impl Slot { + fn vacant() -> Self { + Self { + generation: 0, + actor: None, + state: State::Done, + waiters: Vec::new(), + outcome: None, + supervisor_channel: None, + outstanding_handles: 0, + } + } +} + +// --------------------------------------------------------------------------- +// Scheduler state +// --------------------------------------------------------------------------- + +struct SchedulerState { + slots: Vec, + free_list: Vec, + run_queue: VecDeque, + /// The root supervisor's PID. Children spawned at the top level are + /// supervised by this. Set by `run()`. + root_pid: Option, +} + +impl SchedulerState { + fn new() -> Self { + Self { + slots: Vec::new(), + free_list: Vec::new(), + run_queue: VecDeque::new(), + root_pid: None, + } + } + + /// Allocate a slot; return its (index, generation). + fn allocate_slot(&mut self) -> (u32, u32) { + if let Some(idx) = self.free_list.pop() { + let s = &mut self.slots[idx as usize]; + (idx, s.generation) + } else { + let idx = self.slots.len() as u32; + self.slots.push(Slot::vacant()); + (idx, 0) + } + } + + fn slot(&self, pid: Pid) -> Option<&Slot> { + let s = self.slots.get(pid.index() as usize)?; + if s.generation == pid.generation() { Some(s) } else { None } + } + + fn slot_mut(&mut self, pid: Pid) -> Option<&mut Slot> { + let s = self.slots.get_mut(pid.index() as usize)?; + if s.generation == pid.generation() { Some(s) } else { None } + } +} + +thread_local! { + static SCHED: RefCell> = const { RefCell::new(None) }; +} + +fn with_sched(f: impl FnOnce(&mut SchedulerState) -> R) -> R { + SCHED.with(|c| { + let mut g = c.borrow_mut(); + let s = g.as_mut().expect("scheduler not running"); + f(s) + }) +} + +/// Same as `with_sched` but returns `None` when there's no scheduler instead +/// of panicking. Used on cleanup paths (channel sender drop during shutdown, +/// for example). +fn try_with_sched(f: impl FnOnce(&mut SchedulerState) -> R) -> Option { + SCHED.with(|c| { + let mut g = c.borrow_mut(); + g.as_mut().map(f) + }) +} + +// --------------------------------------------------------------------------- +// JoinHandle +// --------------------------------------------------------------------------- + +#[derive(Debug)] +pub struct JoinError { + /// Whatever `panic!` was called with. + pub payload: Box, +} + +pub struct JoinHandle { + pid: Pid, + /// `false` once `join()` has been called and the handle has consumed + /// its outcome. Prevents the Drop impl from double-decrementing. + consumed: bool, +} + +impl JoinHandle { + pub fn pid(&self) -> Pid { self.pid } + + /// Block the calling actor until the target completes. Returns + /// `Ok(())` on normal exit, `Err(JoinError)` if the target panicked. + pub fn join(mut self) -> Result<(), JoinError> { + let me = current_pid().expect("join() called outside an actor"); + + loop { + let outcome = with_sched(|s| { + let slot = s.slot_mut(self.pid) + .expect("join: target slot has been reused"); + if matches!(slot.state, State::Done) { + Some(slot.outcome.take().expect("Done slot must have an outcome")) + } else { + slot.waiters.push(me); + None + } + }); + + match outcome { + Some(o) => { + self.consumed = true; + self.decrement_handle_count(); + return match o { + Outcome::Exit => Ok(()), + Outcome::Panic(p) => Err(JoinError { payload: p }), + }; + } + None => park_current(), + } + } + } + + fn decrement_handle_count(&mut self) { + with_sched(|s| { + let should_reclaim = match s.slot_mut(self.pid) { + Some(slot) => { + slot.outstanding_handles = slot.outstanding_handles.saturating_sub(1); + matches!(slot.state, State::Done) && slot.outstanding_handles == 0 + } + None => false, + }; + if should_reclaim { + reclaim_slot(s, self.pid); + } + }); + } +} + +impl Drop for JoinHandle { + fn drop(&mut self) { + if !self.consumed { + self.decrement_handle_count(); + } + } +} + +// --------------------------------------------------------------------------- +// Slot reclamation +// --------------------------------------------------------------------------- + +fn reclaim_slot(s: &mut SchedulerState, pid: Pid) { + let idx = pid.index(); + let slot = &mut s.slots[idx as usize]; + // Bump generation so any stale PIDs from now on miss. + slot.generation = slot.generation.wrapping_add(1); + // Drop the actor (its stack with it). + slot.actor = None; + slot.outcome = None; + slot.waiters.clear(); + slot.supervisor_channel = None; + slot.state = State::Done; // semantically vacant; allocator checks free_list + slot.outstanding_handles = 0; + s.free_list.push(idx); +} + +// --------------------------------------------------------------------------- +// spawn / spawn_under / self_pid +// --------------------------------------------------------------------------- + +/// Spawn `f` as a child of the currently-executing actor. +/// Outside an actor (only legal from `run()`'s initial setup), the child's +/// supervisor is the root supervisor. +pub fn spawn(f: impl FnOnce() + Send + 'static) -> JoinHandle { + let parent = current_pid() + .or_else(|| with_sched(|s| s.root_pid)) + .expect("spawn() before run()"); + spawn_under(parent, f) +} + +/// Spawn `f` with `supervisor` as its parent. The supervisor will receive +/// a `Signal` on its registered channel when the child terminates. +pub fn spawn_under(supervisor: Pid, f: impl FnOnce() + Send + 'static) -> JoinHandle { + let pid = with_sched(|s| { + let (idx, gen) = s.allocate_slot(); + let pid = Pid::new(idx, gen); + let stack = Stack::new(ACTOR_STACK_SIZE) + .expect("stack allocation failed"); + let sp = init_actor_stack(stack.top(), trampoline); + let slot = &mut s.slots[idx as usize]; + slot.actor = Some(Actor { pid, stack, sp, supervisor }); + slot.state = State::Runnable; + slot.outstanding_handles = 1; + slot.outcome = None; + slot.waiters.clear(); + slot.supervisor_channel = None; + s.run_queue.push_back(pid); + pid + }); + + // Stash the closure where `schedule_loop` will find it before the first + // resume. + PENDING_CLOSURES.with(|c| { + c.borrow_mut().push((pid, Box::new(f) as Closure)); + }); + + JoinHandle { pid, consumed: false } +} + +type Closure = Box; + +thread_local! { + /// Closures awaiting their first resume. Keyed by the PID the scheduler + /// allocated for them in `spawn_under`. The scheduler pops from here in + /// `pop_pending_closure` right before each first resume. + static PENDING_CLOSURES: RefCell> = const { RefCell::new(Vec::new()) }; +} + +fn pop_pending_closure(pid: Pid) -> Option { + PENDING_CLOSURES.with(|c| { + let mut v = c.borrow_mut(); + v.iter().position(|(p, _)| *p == pid).map(|i| v.swap_remove(i).1) + }) +} + +pub fn self_pid() -> Pid { + current_pid().expect("self_pid() called outside an actor") +} + +// --------------------------------------------------------------------------- +// yield_now / park / unpark +// --------------------------------------------------------------------------- + +/// Cooperative yield. The current actor goes to the back of the run queue. +pub fn yield_now() { + // Mark ourselves as needing to be re-queued, then yield. + YIELD_INTENT.with(|c| c.set(YieldIntent::Yield)); + unsafe { crate::context::switch_to_scheduler() }; +} + +/// Park the current actor (remove it from the run queue until `unpark`). +pub fn park_current() { + YIELD_INTENT.with(|c| c.set(YieldIntent::Park)); + unsafe { crate::context::switch_to_scheduler() }; +} + +/// Wake a parked actor. If the actor isn't parked (already runnable or done) +/// this is a no-op — that's important; channel and join can both fire +/// spurious unparks under some orderings and we want them to be cheap. +/// Also a no-op if the scheduler isn't running (covers channel-sender drop +/// during runtime teardown). +pub fn unpark(pid: Pid) { + try_with_sched(|s| { + if let Some(slot) = s.slot_mut(pid) { + if matches!(slot.state, State::Parked) { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + } + } + }); +} + +/// What an actor wants the scheduler to do when control returns from it. +#[derive(Copy, Clone)] +enum YieldIntent { + /// Re-queue (yield_now or preemption). + Yield, + /// Remove from the run queue (waiting for unpark). + Park, +} + +thread_local! { + static YIELD_INTENT: std::cell::Cell = const { std::cell::Cell::new(YieldIntent::Yield) }; +} + +// --------------------------------------------------------------------------- +// Supervisor channel registration +// --------------------------------------------------------------------------- + +/// Register `sender` as the mailbox for signals about children supervised +/// by `pid`. Idempotent; later calls overwrite. +pub fn register_supervisor_channel(pid: Pid, sender: Sender) { + with_sched(|s| { + if let Some(slot) = s.slot_mut(pid) { + slot.supervisor_channel = Some(sender); + } else { + panic!("register_supervisor_channel: pid {:?} not found", pid); + } + }); +} + +// --------------------------------------------------------------------------- +// run() — the runtime entry point +// --------------------------------------------------------------------------- + +/// Boot the runtime, spawn `initial` as a child of the root supervisor, +/// drive the scheduler until the run queue is empty, tear down. +/// +/// The root supervisor is a *sentinel* PID, not a real actor. Signals +/// addressed to it are dropped on the floor — that's what "process exits" +/// means in the spec when nothing escalates further. User code that wants +/// real supervision spawns its own supervisor actor and uses `spawn_under`. +pub fn run(initial: F) { + SCHED.with(|c| { + assert!(c.borrow().is_none(), "smarm::run() called recursively"); + let mut state = SchedulerState::new(); + state.root_pid = Some(ROOT_PID); + *c.borrow_mut() = Some(state); + }); + + let initial_handle = spawn(initial); + + schedule_loop(); + + // Drop the handle BEFORE the scheduler is torn down — its Drop impl + // calls `with_sched` to decrement the outstanding-handle count. + drop(initial_handle); + + // Take the SchedulerState out of the thread-local BEFORE dropping it. + // Dropping it while still inside SCHED.with's RefCell borrow would + // re-enter (via channel senders' Drop → unpark → try_with_sched). + let state = SCHED.with(|c| c.borrow_mut().take()); + drop(state); + PENDING_CLOSURES.with(|c| c.borrow_mut().clear()); +} + +/// Reserved sentinel pid for the root supervisor. Never allocated to a +/// real actor; lookups return `None`; signals are dropped. +pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX); + +fn schedule_loop() { + loop { + let pid = match with_sched(|s| s.run_queue.pop_front()) { + Some(p) => p, + None => return, + }; + + // Look up sp; skip stale or already-reaped pids. + let sp = match with_sched(|s| { + s.slot(pid).and_then(|slot| slot.actor.as_ref().map(|a| a.sp)) + }) { + Some(sp) => sp, + None => continue, + }; + + // If this is a first resume, move the pending closure to the + // thread-local the trampoline reads. + if let Some(b) = pop_pending_closure(pid) { + set_current_actor_box(b); + } + + set_actor_sp(sp); + set_current_pid(pid); + reset_actor_done(); + YIELD_INTENT.with(|c| c.set(YieldIntent::Yield)); + + crate::preempt::reset_timeslice(); + PREEMPTION_ENABLED.with(|c| c.set(true)); + + unsafe { switch_to_actor() }; + + PREEMPTION_ENABLED.with(|c| c.set(false)); + clear_current_pid(); + + let intent = YIELD_INTENT.with(|c| c.get()); + let new_sp = get_actor_sp(); + + if is_actor_done() { + let outcome = take_last_outcome().unwrap_or(Outcome::Exit); + finalize_actor(pid, outcome); + } else { + with_sched(|s| { + if let Some(slot) = s.slot_mut(pid) { + if let Some(actor) = slot.actor.as_mut() { + actor.sp = new_sp; + } + match intent { + YieldIntent::Yield => { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + } + YieldIntent::Park => { + slot.state = State::Parked; + } + } + } + }); + } + } +} + +fn finalize_actor(pid: Pid, outcome: Outcome) { + // Joiners get the typed Result with the panic payload. The supervisor + // gets an informational `Signal::Panic` with an empty payload — its job + // is policy (restart/escalate), not forensics. Users who need the + // payload in supervision can plumb their own channel. + + let (joiner_outcome, sup_signal) = match outcome { + Outcome::Exit => (Outcome::Exit, Signal::Exit(pid)), + Outcome::Panic(payload) => ( + Outcome::Panic(payload), + Signal::Panic(pid, Box::new(()) as Box), + ), + }; + + // Stash outcome, mark Done, collect waiters, drop the actor stack. + let (waiters, supervisor_pid) = with_sched(|s| { + let slot = s.slot_mut(pid).expect("finalize_actor: slot vanished"); + let sup = slot.actor.as_ref().map(|a| a.supervisor); + slot.outcome = Some(joiner_outcome); + slot.state = State::Done; + slot.actor = None; + let w = std::mem::take(&mut slot.waiters); + (w, sup) + }); + + // Deliver to supervisor (best-effort; ignore SendError). + if let Some(sup) = supervisor_pid { + let sender = with_sched(|s| { + s.slot(sup).and_then(|slot| slot.supervisor_channel.clone()) + }); + if let Some(sender) = sender { + let _ = sender.send(sup_signal); + } + } + + // Unpark joiners. + for joiner in waiters { + unpark(joiner); + } + + // Reclaim if no outstanding handles. + with_sched(|s| { + let should_reclaim = match s.slot(pid) { + Some(slot) => slot.outstanding_handles == 0, + None => false, + }; + if should_reclaim { + reclaim_slot(s, pid); + } + }); +} diff --git a/src/stack.rs b/src/stack.rs new file mode 100644 index 0000000..b742531 --- /dev/null +++ b/src/stack.rs @@ -0,0 +1,89 @@ +//! mmap-based growable stack with a guard page below. +//! +//! Layout (low → high address): +//! [ guard page (PROT_NONE) | stack region ] +//! ^ top() — initial stack pointer +//! +//! Stacks grow downward. Overflow lands in the guard page → SIGSEGV. + +use std::io; + +pub struct Stack { + /// Bottom of the entire mmap'd region (start of guard page). + base: *mut u8, + /// Total mmap'd size: guard_size + stack_size. + total_size: usize, + /// Usable stack size (excluding guard page). + stack_size: usize, +} + +// Stack owns its memory; safe to send across threads. +unsafe impl Send for Stack {} + +impl Stack { + /// Allocate a new stack. `stack_size` is the usable region; one page is + /// added below as a guard page. Both are rounded up to the page size. + pub fn new(stack_size: usize) -> io::Result { + let page = page_size(); + let stack_size = round_up(stack_size, page); + let guard_size = page; + let total_size = guard_size + stack_size; + + let base = unsafe { + libc::mmap( + std::ptr::null_mut(), + total_size, + libc::PROT_READ | libc::PROT_WRITE, + libc::MAP_PRIVATE | libc::MAP_ANONYMOUS, + -1, + 0, + ) + }; + if base == libc::MAP_FAILED { + return Err(io::Error::last_os_error()); + } + let base = base as *mut u8; + + let ret = unsafe { + libc::mprotect(base as *mut libc::c_void, guard_size, libc::PROT_NONE) + }; + if ret != 0 { + let err = io::Error::last_os_error(); + unsafe { libc::munmap(base as *mut libc::c_void, total_size) }; + return Err(err); + } + + Ok(Self { base, total_size, stack_size }) + } + + /// 16-byte-aligned top of the usable region. + pub fn top(&self) -> *mut u8 { + let raw_top = self.base as usize + self.total_size; + (raw_top & !15) as *mut u8 + } + + /// Pointer to the bottom of the usable region (just above the guard page). + pub fn usable_base(&self) -> *mut u8 { + unsafe { self.base.add(page_size()) } + } + + pub fn stack_size(&self) -> usize { + self.stack_size + } +} + +impl Drop for Stack { + fn drop(&mut self) { + unsafe { + libc::munmap(self.base as *mut libc::c_void, self.total_size); + } + } +} + +fn page_size() -> usize { + unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize } +} + +fn round_up(n: usize, align: usize) -> usize { + (n + align - 1) & !(align - 1) +} diff --git a/src/supervisor.rs b/src/supervisor.rs new file mode 100644 index 0000000..2ecc35d --- /dev/null +++ b/src/supervisor.rs @@ -0,0 +1,37 @@ +//! Supervision signals. +//! +//! Every actor has a supervisor, which is itself just an actor with a +//! `Receiver`. When a child actor terminates, the scheduler sends +//! a `Signal` on the supervisor's channel. The supervisor decides what to +//! do — restart, escalate, ignore. +//! +//! For v0.1 there is no built-in restart-intensity cap. That's policy and +//! lives in user code; library is mechanism only. + +use crate::pid::Pid; +use std::any::Any; + +pub enum Signal { + /// The child exited normally. + Exit(Pid), + /// The child panicked. Payload is whatever `panic!` was called with. + Panic(Pid, Box), +} + +impl std::fmt::Debug for Signal { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Signal::Exit(pid) => write!(f, "Signal::Exit({:?})", pid), + Signal::Panic(pid, _) => write!(f, "Signal::Panic({:?}, ..)", pid), + } + } +} + +impl Signal { + pub fn pid(&self) -> Pid { + match self { + Signal::Exit(p) => *p, + Signal::Panic(p, _) => *p, + } + } +} diff --git a/tests/channel.rs b/tests/channel.rs new file mode 100644 index 0000000..ed87c8d --- /dev/null +++ b/tests/channel.rs @@ -0,0 +1,110 @@ +//! Channel tests. These run under the scheduler because `recv()` needs to +//! be able to park, which requires a live runtime. + +use smarm::{channel, run, spawn}; +use std::cell::Cell; + +thread_local! { + static OUT: Cell = const { Cell::new(0) }; +} + +#[test] +fn send_then_recv_same_actor() { + OUT.with(|c| c.set(0)); + run(|| { + let (tx, rx) = channel::(); + tx.send(42).unwrap(); + let v = rx.recv().unwrap(); + OUT.with(|c| c.set(v)); + }); + assert_eq!(OUT.with(|c| c.get()), 42); +} + +#[test] +fn recv_parks_until_send_from_other_actor() { + OUT.with(|c| c.set(0)); + run(|| { + let (tx, rx) = channel::(); + let h = spawn(move || { + // This actor blocks on an empty channel. + let v = rx.recv().unwrap(); + OUT.with(|c| c.set(v)); + }); + // Parent runs, then yields to let the child block, + // then sends, then joins. + smarm::yield_now(); + tx.send(7).unwrap(); + h.join().unwrap(); + }); + assert_eq!(OUT.with(|c| c.get()), 7); +} + +#[test] +fn multiple_messages_arrive_in_order() { + let captured: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let cap2 = captured.clone(); + + run(move || { + let (tx, rx) = channel::(); + let h = spawn(move || { + for _ in 0..3 { + let v = rx.recv().unwrap(); + cap2.lock().unwrap().push(v); + } + }); + for v in 1..=3i64 { + tx.send(v).unwrap(); + } + h.join().unwrap(); + }); + + assert_eq!(*captured.lock().unwrap(), vec![1, 2, 3]); +} + +#[test] +fn cloned_senders_both_deliver() { + let captured: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); + let cap2 = captured.clone(); + + run(move || { + let (tx, rx) = channel::(); + let tx2 = tx.clone(); + let h = spawn(move || { + for _ in 0..2 { + let v = rx.recv().unwrap(); + cap2.lock().unwrap().push(v); + } + }); + tx.send(10).unwrap(); + tx2.send(20).unwrap(); + h.join().unwrap(); + }); + + let mut got = captured.lock().unwrap().clone(); + got.sort(); + assert_eq!(got, vec![10, 20]); +} + +#[test] +fn recv_returns_err_when_all_senders_dropped() { + let saw_err: std::sync::Arc = + std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let saw_err2 = saw_err.clone(); + + run(move || { + let (tx, rx) = channel::(); + let h = spawn(move || { + // Receiver waits; no message will ever come. + if rx.recv().is_err() { + saw_err2.store(true, std::sync::atomic::Ordering::SeqCst); + } + }); + smarm::yield_now(); + drop(tx); // last sender gone; rx.recv must return Err. + h.join().unwrap(); + }); + + assert!(saw_err.load(std::sync::atomic::Ordering::SeqCst)); +} diff --git a/tests/context.rs b/tests/context.rs new file mode 100644 index 0000000..150bcb1 --- /dev/null +++ b/tests/context.rs @@ -0,0 +1,137 @@ +//! Low-level context-switch tests. These poke `init_actor_stack` and the +//! naked asm shims directly — no scheduler involved. + +use smarm::context::{ + get_actor_sp, init_actor_stack, set_actor_sp, switch_to_actor, switch_to_scheduler, +}; +use smarm::stack::Stack; +use std::cell::Cell; + +thread_local! { + static LOG: Cell = const { Cell::new(0) }; +} + +fn log(v: u64) { LOG.with(|c| c.set(c.get() | v)); } +fn get_log() -> u64 { LOG.with(|c| c.get()) } +fn reset_log() { LOG.with(|c| c.set(0)); } + +extern "C-unwind" fn actor_simple() { + log(0x1); + unsafe { switch_to_scheduler() }; +} + +#[test] +fn actor_runs_and_returns_to_scheduler() { + reset_log(); + let stack = Stack::new(64 * 1024).unwrap(); + let sp = init_actor_stack(stack.top(), actor_simple); + set_actor_sp(sp); + unsafe { switch_to_actor() }; + assert_eq!(get_log(), 0x1); +} + +extern "C-unwind" fn actor_two_steps() { + log(0x1); + unsafe { switch_to_scheduler() }; + log(0x2); + unsafe { switch_to_scheduler() }; +} + +#[test] +fn actor_yields_and_resumes() { + reset_log(); + let stack = Stack::new(64 * 1024).unwrap(); + let sp = init_actor_stack(stack.top(), actor_two_steps); + set_actor_sp(sp); + + unsafe { switch_to_actor() }; + assert_eq!(get_log(), 0x1, "after first resume"); + + unsafe { switch_to_actor() }; + assert_eq!(get_log(), 0x1 | 0x2, "after second resume"); +} + +// Callee-saved registers must survive a yield. + +use std::sync::OnceLock; + +static REG_BEFORE: OnceLock<[u64; 4]> = OnceLock::new(); +static REG_AFTER: OnceLock<[u64; 4]> = OnceLock::new(); + +extern "C-unwind" fn actor_reg_check() { + unsafe { + let s0: u64 = 0xAAAA_BBBB_0000_0001; + let s1: u64 = 0xCCCC_DDDD_0000_0002; + let s2: u64 = 0xEEEE_FFFF_0000_0003; + let s3: u64 = 0x1111_2222_0000_0004; + + core::arch::asm!( + "mov r12, {s0}", "mov r13, {s1}", "mov r14, {s2}", "mov r15, {s3}", + s0 = in(reg) s0, s1 = in(reg) s1, s2 = in(reg) s2, s3 = in(reg) s3, + out("r12") _, out("r13") _, out("r14") _, out("r15") _, + ); + REG_BEFORE.set([s0, s1, s2, s3]).ok(); + switch_to_scheduler(); + + let a0: u64; let a1: u64; let a2: u64; let a3: u64; + core::arch::asm!( + "mov {a0}, r12", "mov {a1}, r13", "mov {a2}, r14", "mov {a3}, r15", + a0 = out(reg) a0, a1 = out(reg) a1, a2 = out(reg) a2, a3 = out(reg) a3, + ); + REG_AFTER.set([a0, a1, a2, a3]).ok(); + switch_to_scheduler(); + } +} + +#[test] +fn callee_saved_registers_survive_yield() { + let stack = Stack::new(64 * 1024).unwrap(); + let sp = init_actor_stack(stack.top(), actor_reg_check); + set_actor_sp(sp); + unsafe { switch_to_actor(); switch_to_actor(); } + assert_eq!(REG_BEFORE.get().copied().unwrap(), REG_AFTER.get().copied().unwrap()); +} + +// Two actors, independent stacks. + +thread_local! { + static A_VAL: Cell = const { Cell::new(0) }; + static B_VAL: Cell = const { Cell::new(0) }; +} + +extern "C-unwind" fn actor_a() { + A_VAL.with(|c| c.set(0xAAAA)); + unsafe { switch_to_scheduler() }; + let v = A_VAL.with(|c| c.get()); + A_VAL.with(|c| c.set(if v == 0xAAAA { 0xA00D } else { 0xDEAD })); + unsafe { switch_to_scheduler() }; +} + +extern "C-unwind" fn actor_b() { + B_VAL.with(|c| c.set(0xBBBB)); + unsafe { switch_to_scheduler() }; + let v = B_VAL.with(|c| c.get()); + B_VAL.with(|c| c.set(if v == 0xBBBB { 0xB00D } else { 0xDEAD })); + unsafe { switch_to_scheduler() }; +} + +#[test] +fn two_actors_dont_corrupt_each_other() { + let stack_a = Stack::new(64 * 1024).unwrap(); + let stack_b = Stack::new(64 * 1024).unwrap(); + + let sp_a = init_actor_stack(stack_a.top(), actor_a); + let sp_b = init_actor_stack(stack_b.top(), actor_b); + + set_actor_sp(sp_a); unsafe { switch_to_actor() }; + let sp_a = get_actor_sp(); + + set_actor_sp(sp_b); unsafe { switch_to_actor() }; + let sp_b = get_actor_sp(); + + set_actor_sp(sp_a); unsafe { switch_to_actor() }; + set_actor_sp(sp_b); unsafe { switch_to_actor() }; + + assert_eq!(A_VAL.with(|c| c.get()), 0xA00D); + assert_eq!(B_VAL.with(|c| c.get()), 0xB00D); +} diff --git a/tests/pid.rs b/tests/pid.rs new file mode 100644 index 0000000..8ea410b --- /dev/null +++ b/tests/pid.rs @@ -0,0 +1,22 @@ +use smarm::pid::Pid; + +#[test] +fn pid_equality() { + assert_eq!(Pid::new(0, 0), Pid::new(0, 0)); + assert_ne!(Pid::new(0, 0), Pid::new(0, 1)); + assert_ne!(Pid::new(0, 0), Pid::new(1, 0)); +} + +#[test] +fn pid_accessors() { + let p = Pid::new(42, 7); + assert_eq!(p.index(), 42); + assert_eq!(p.generation(), 7); +} + +#[test] +fn pid_debug_is_useful() { + let p = Pid::new(3, 5); + let s = format!("{:?}", p); + assert!(s.contains('3') && s.contains('5'), "got: {}", s); +} diff --git a/tests/scheduler.rs b/tests/scheduler.rs new file mode 100644 index 0000000..ed7a70d --- /dev/null +++ b/tests/scheduler.rs @@ -0,0 +1,171 @@ +//! End-to-end scheduler tests: spawning, joining, panic delivery, +//! yield_now, self_pid. + +use smarm::{channel, run, self_pid, spawn, spawn_under, yield_now, Signal}; +use std::cell::Cell; +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::Arc; + +// --------------------------------------------------------------------------- +// Single root actor runs to completion +// --------------------------------------------------------------------------- + +#[test] +fn root_actor_runs() { + let captured = Arc::new(AtomicI64::new(0)); + let c = captured.clone(); + run(move || { c.store(99, Ordering::SeqCst); }); + assert_eq!(captured.load(Ordering::SeqCst), 99); +} + +// --------------------------------------------------------------------------- +// Spawn child, join it +// --------------------------------------------------------------------------- + +#[test] +fn spawn_and_join_returns_exit() { + let captured = Arc::new(AtomicI64::new(0)); + let c = captured.clone(); + run(move || { + let h = spawn(move || { c.store(7, Ordering::SeqCst); }); + let res = h.join(); + assert!(res.is_ok(), "join returned {:?}", res); + }); + assert_eq!(captured.load(Ordering::SeqCst), 7); +} + +// --------------------------------------------------------------------------- +// yield_now lets a sibling run +// --------------------------------------------------------------------------- + +#[test] +fn yield_now_interleaves_actors() { + let log: Arc>> = Arc::new(std::sync::Mutex::new(Vec::new())); + let l1 = log.clone(); + let l2 = log.clone(); + run(move || { + let h1 = spawn(move || { + l1.lock().unwrap().push(1); + yield_now(); + l1.lock().unwrap().push(3); + }); + let h2 = spawn(move || { + l2.lock().unwrap().push(2); + yield_now(); + l2.lock().unwrap().push(4); + }); + h1.join().unwrap(); + h2.join().unwrap(); + }); + // Both actors get their first step before either second step. Exact order + // is FIFO: 1, 2, then 3, 4. + assert_eq!(*log.lock().unwrap(), vec![1, 2, 3, 4]); +} + +// --------------------------------------------------------------------------- +// self_pid returns this actor's pid inside the actor +// --------------------------------------------------------------------------- + +#[test] +fn self_pid_is_stable_within_an_actor() { + let pid_cell: Arc>> = + Arc::new(std::sync::Mutex::new(None)); + let p2 = pid_cell.clone(); + run(move || { + let h = spawn(move || { + let me = self_pid(); + yield_now(); + assert_eq!(me, self_pid(), "self_pid changed across yield"); + *p2.lock().unwrap() = Some(me); + }); + h.join().unwrap(); + }); + assert!(pid_cell.lock().unwrap().is_some()); +} + +// --------------------------------------------------------------------------- +// Panic is captured; join returns Err; supervisor receives Signal::Panic +// --------------------------------------------------------------------------- + +#[test] +fn panicking_child_returns_join_error() { + let saw_err = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let s = saw_err.clone(); + run(move || { + let h = spawn(|| panic!("kaboom")); + if h.join().is_err() { + s.store(true, Ordering::SeqCst); + } + }); + + assert!(saw_err.load(Ordering::SeqCst)); +} + +#[test] +fn supervisor_receives_panic_signal() { + let saw_panic_signal = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let s = saw_panic_signal.clone(); + + run(move || { + // Build a supervisor actor with its own mailbox. + let (sig_tx, sig_rx) = channel::(); + let sup_handle = spawn(move || { + // Wait for exactly one signal. + let sig = sig_rx.recv().unwrap(); + if let Signal::Panic(_, _) = sig { + s.store(true, Ordering::SeqCst); + } + }); + // Tell the runtime: when I spawn the next child, route signals here. + let sup_pid = sup_handle.pid(); + smarm::scheduler::register_supervisor_channel(sup_pid, sig_tx); + + let child = spawn_under(sup_pid, || panic!("oops")); + let _ = child.join(); + sup_handle.join().unwrap(); + }); + + assert!(saw_panic_signal.load(Ordering::SeqCst)); +} + +// --------------------------------------------------------------------------- +// Multiple children, all complete, parent gets back control +// --------------------------------------------------------------------------- + +#[test] +fn many_children_all_complete() { + let counter = Arc::new(AtomicI64::new(0)); + let c = counter.clone(); + run(move || { + let mut handles = Vec::new(); + for _ in 0..10 { + let cc = c.clone(); + handles.push(spawn(move || { + cc.fetch_add(1, Ordering::SeqCst); + })); + } + for h in handles { + h.join().unwrap(); + } + }); + assert_eq!(counter.load(Ordering::SeqCst), 10); +} + +// --------------------------------------------------------------------------- +// Repeated yield_now inside an actor with no other actors completes +// --------------------------------------------------------------------------- + +#[test] +fn yield_alone_terminates() { + thread_local! { + static N: Cell = const { Cell::new(0) }; + } + N.with(|c| c.set(0)); + run(|| { + for _ in 0..5 { + N.with(|c| c.set(c.get() + 1)); + yield_now(); + } + }); + assert_eq!(N.with(|c| c.get()), 5); +} diff --git a/tests/stack.rs b/tests/stack.rs new file mode 100644 index 0000000..cec741a --- /dev/null +++ b/tests/stack.rs @@ -0,0 +1,123 @@ +//! Stack allocator tests. +//! +//! Covers allocation, alignment, read/write across the usable region, and +//! (via subprocess) that the guard page actually SIGSEGVs. + +use smarm::stack::Stack; + +#[test] +fn top_is_16_byte_aligned() { + let s = Stack::new(64 * 1024).unwrap(); + assert_eq!(s.top() as usize % 16, 0); +} + +#[test] +fn top_is_within_allocation() { + let s = Stack::new(64 * 1024).unwrap(); + let top = s.top() as usize; + let base = s.usable_base() as usize; + assert!(top > base); + assert!(top <= base + s.stack_size()); +} + +#[test] +fn write_and_read_top_of_stack() { + let s = Stack::new(64 * 1024).unwrap(); + let sentinel: u64 = 0xDEAD_BEEF_CAFE_1234; + unsafe { + let ptr = s.top().sub(8) as *mut u64; + ptr.write_volatile(sentinel); + assert_eq!(ptr.read_volatile(), sentinel); + } +} + +#[test] +fn write_and_read_bottom_of_usable_region() { + let s = Stack::new(64 * 1024).unwrap(); + let sentinel: u64 = 0x0102_0304_0506_0708; + unsafe { + let ptr = s.usable_base() as *mut u64; + ptr.write_volatile(sentinel); + assert_eq!(ptr.read_volatile(), sentinel); + } +} + +#[test] +fn small_stack_allocates() { + assert!(Stack::new(4096).is_ok()); +} + +#[test] +fn large_stack_allocates() { + assert!(Stack::new(8 * 1024 * 1024).is_ok()); +} + +#[test] +fn stack_size_at_least_requested() { + let s = Stack::new(64 * 1024).unwrap(); + assert!(s.stack_size() >= 64 * 1024); +} + +// --------------------------------------------------------------------------- +// Guard page SIGSEGV tests — subprocess-based. +// --------------------------------------------------------------------------- + +use std::env; +use std::process::Command; + +fn run_as_child_if_requested() { + match env::var("SMARM_SUBTEST").as_deref() { + Ok("guard_page_direct") => { + let s = Stack::new(64 * 1024).unwrap(); + unsafe { + let guard_ptr = s.usable_base().sub(1); + guard_ptr.write_volatile(0xAB); + } + std::process::exit(0); + } + Ok("stack_overflow") => { + let s = Stack::new(64 * 1024).unwrap(); + unsafe { + let mut ptr = s.top().sub(1); + let stop = s.usable_base().sub(1); + while ptr >= stop { + ptr.write_volatile(0xFF); + ptr = ptr.sub(1); + } + } + std::process::exit(0); + } + _ => {} + } +} + +fn spawn_subtest(name: &str) -> std::process::ExitStatus { + let exe = env::current_exe().unwrap(); + Command::new(exe) + .env("SMARM_SUBTEST", name) + .args(["--test-threads=1", "--quiet"]) + .status() + .expect("failed to spawn subprocess") +} + +#[test] +fn guard_page_causes_sigsegv() { + run_as_child_if_requested(); + let status = spawn_subtest("guard_page_direct"); + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + assert_eq!(status.signal(), Some(11), "expected SIGSEGV, got: {:?}", status); + } +} + +#[test] +fn stack_overflow_causes_sigsegv() { + run_as_child_if_requested(); + let status = spawn_subtest("stack_overflow"); + #[cfg(unix)] + { + use std::os::unix::process::ExitStatusExt; + assert_eq!(status.signal(), Some(11), "expected SIGSEGV, got: {:?}", status); + } +}