feat: full runtime redesign (v0.6)

Complete rewrite with improved architecture & correctness:
- src/runtime.rs: Simplified task scheduling with proper state transitions
- src/scheduler.rs: Decoupled from runtime, pure task queue logic
- src/io.rs, src/mutex.rs: Refactored for clarity & performance
- New actor model framework (src/actor.rs, src/context.rs)
- Channel primitives (src/channel.rs) & process IDs (src/pid.rs)
- Preemption framework (src/preempt.rs) for fair timeslicing
- Expanded benchmarks & tests (multi_scheduler, primes, runtime)
This commit is contained in:
Claude
2026-05-23 16:09:35 +00:00
parent 078447539c
commit 978678a46e
31 changed files with 5751 additions and 0 deletions

110
src/actor.rs Normal file
View File

@@ -0,0 +1,110 @@
//! Actor descriptor and trampoline.
//!
//! An `Actor` owns its stack and holds the closure it will run. The
//! `trampoline` is a fixed `extern "C-unwind" fn()` that every actor enters
//! through; it pulls the closure out of a thread-local set by the scheduler
//! immediately before resume, invokes it inside `catch_unwind`, records the
//! outcome, and switches back to the scheduler.
//!
//! Why a thread-local and not, say, passing the closure pointer via a
//! register? Because the first resume goes through `ret`, not `call`, and
//! we have no other channel for parameters. The scheduler sets the
//! thread-local, switches in, the trampoline reads it. After the first
//! resume the closure has been consumed, so subsequent resumes don't need it.
use crate::context::switch_to_scheduler;
use crate::pid::Pid;
use crate::stack::Stack;
use std::any::Any;
use std::cell::{Cell, RefCell};
use std::panic;
/// What an actor produced when it finished. Stored on the actor's slot,
/// drained by `JoinHandle::join` once the slot is marked done.
pub enum Outcome {
Exit,
Panic(Box<dyn Any + Send>),
}
// Thread-locals that the scheduler writes immediately before `switch_to_actor`.
thread_local! {
/// The closure for the actor we're about to resume *for the first time*.
/// Consumed on first entry into the trampoline; `None` thereafter.
static CURRENT_ACTOR_BOX: RefCell<Option<Box<dyn FnOnce() + Send>>> =
const { RefCell::new(None) };
/// The PID of the actor currently executing on this OS thread.
/// Set on every resume so that `self_pid()` works inside actor code.
static CURRENT_PID: Cell<Option<Pid>> = const { Cell::new(None) };
/// Filled by the trampoline when the actor returns (normally or via
/// panic). The scheduler reads this after `switch_to_actor` returns.
static LAST_OUTCOME: RefCell<Option<Outcome>> = const { RefCell::new(None) };
/// Set by the trampoline on completion; reset by the scheduler before
/// each resume so it never sees stale state.
static ACTOR_DONE: Cell<bool> = const { Cell::new(false) };
}
pub fn set_current_actor_box(b: Box<dyn FnOnce() + Send>) {
CURRENT_ACTOR_BOX.with(|c| *c.borrow_mut() = Some(b));
}
pub fn set_current_pid(p: Pid) {
CURRENT_PID.with(|c| c.set(Some(p)));
}
pub fn clear_current_pid() {
CURRENT_PID.with(|c| c.set(None));
}
pub fn current_pid() -> Option<Pid> {
CURRENT_PID.with(|c| c.get())
}
pub fn reset_actor_done() {
ACTOR_DONE.with(|c| c.set(false));
}
pub fn is_actor_done() -> bool {
ACTOR_DONE.with(|c| c.get())
}
pub fn take_last_outcome() -> Option<Outcome> {
LAST_OUTCOME.with(|r| r.borrow_mut().take())
}
/// The function whose address is written as the `ret` target on every actor
/// stack. The compiler must not inline this away. `extern "C-unwind"` permits
/// unwinding to cross the boundary, but `catch_unwind` here means unwinding
/// never actually does.
pub extern "C-unwind" fn trampoline() {
let b = CURRENT_ACTOR_BOX.with(|c| c.borrow_mut().take())
.expect("trampoline entered without a closure set");
let outcome = match panic::catch_unwind(panic::AssertUnwindSafe(b)) {
Ok(()) => Outcome::Exit,
Err(payload) => Outcome::Panic(payload),
};
LAST_OUTCOME.with(|r| *r.borrow_mut() = Some(outcome));
ACTOR_DONE.with(|c| c.set(true));
// Hand control back. The scheduler will tear down our slot and never
// resume us again.
unsafe { switch_to_scheduler() };
// Unreachable. If it isn't, the scheduler has a bug.
unreachable!("scheduler resumed a done actor");
}
/// One actor's worth of state. Owned by the scheduler's slot table.
pub struct Actor {
/// The PID this actor was assigned at spawn time.
pub pid: Pid,
/// The stack the actor runs on. Dropped (munmap'd) when the actor dies.
pub stack: Stack,
/// The saved stack pointer. Updated on every yield.
pub sp: usize,
/// The PID of this actor's supervisor. Used to deliver `Signal` on death.
pub supervisor: Pid,
}

153
src/channel.rs Normal file
View File

@@ -0,0 +1,153 @@
//! Unbounded MPSC channels.
//!
//! Inner state is `Arc<Mutex<Inner<T>>>` so channels can be sent across OS
//! threads (required for the multi-scheduler runtime where a sender and
//! receiver may run on different scheduler threads simultaneously).
//!
//! Semantics:
//! - Senders are clonable; the last sender drop closes the channel.
//! - `Receiver::recv` on an empty open channel parks the receiver.
//! - `Receiver::recv` on an empty closed channel returns `Err(RecvError)`.
//! - `Sender::send` on an open channel always succeeds.
//! - `Sender::send` on a closed channel (receiver dropped) returns
//! `Err(SendError(value))`.
//! - When a send pushes to a previously empty queue and a receiver is
//! parked, the receiver is unparked.
use crate::pid::Pid;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Mutex::new(Inner {
queue: VecDeque::new(),
parked_receiver: None,
senders: 1,
receiver_alive: true,
}));
(Sender { inner: inner.clone() }, Receiver { inner })
}
struct Inner<T> {
queue: VecDeque<T>,
parked_receiver: Option<Pid>,
senders: usize,
receiver_alive: bool,
}
pub struct Sender<T> {
inner: Arc<Mutex<Inner<T>>>,
}
pub struct Receiver<T> {
inner: Arc<Mutex<Inner<T>>>,
}
#[derive(Debug, PartialEq, Eq)]
pub struct SendError<T>(pub T);
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct RecvError;
impl std::fmt::Display for RecvError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "channel closed")
}
}
impl std::error::Error for RecvError {}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
self.inner.lock().unwrap().senders += 1;
Sender { inner: self.inner.clone() }
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let unpark = {
let mut g = self.inner.lock().unwrap();
g.senders -= 1;
if g.senders == 0 && g.queue.is_empty() {
g.parked_receiver.take()
} else {
None
}
};
if let Some(pid) = unpark {
crate::scheduler::unpark(pid);
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.inner.lock().unwrap().receiver_alive = false;
}
}
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
let unpark = {
let mut g = self.inner.lock().unwrap();
if !g.receiver_alive {
return Err(SendError(value));
}
g.queue.push_back(value);
g.parked_receiver.take()
};
if let Some(pid) = unpark {
let me = crate::actor::current_pid();
crate::te!(crate::trace::Event::Send { sender: me.unwrap_or(crate::pid::Pid::new(u32::MAX, u32::MAX)), receiver: Some(pid) });
crate::scheduler::unpark(pid);
} else {
let me = crate::actor::current_pid();
crate::te!(crate::trace::Event::Send { sender: me.unwrap_or(crate::pid::Pid::new(u32::MAX, u32::MAX)), receiver: None });
}
Ok(())
}
}
impl<T> Receiver<T> {
pub fn recv(&self) -> Result<T, RecvError> {
loop {
{
let mut g = self.inner.lock().unwrap();
if let Some(v) = g.queue.pop_front() {
return Ok(v);
}
if g.senders == 0 {
return Err(RecvError);
}
let me = crate::actor::current_pid()
.expect("recv() called outside an actor");
debug_assert!(
g.parked_receiver.is_none(),
"channel has more than one receiver"
);
g.parked_receiver = Some(me);
crate::te!(crate::trace::Event::RecvPark(me));
}
// Release the lock before parking — the unparker will need it.
crate::scheduler::park_current();
// Woken up — record it before looping to check the queue.
if let Some(me) = crate::actor::current_pid() {
crate::te!(crate::trace::Event::RecvWake(me));
}
}
}
/// Non-blocking. `Ok(Some(v))` if a message was available, `Ok(None)` if
/// the channel is empty but open, `Err(RecvError)` if closed and drained.
pub fn try_recv(&self) -> Result<Option<T>, RecvError> {
let mut g = self.inner.lock().unwrap();
if let Some(v) = g.queue.pop_front() {
return Ok(Some(v));
}
if g.senders == 0 {
return Err(RecvError);
}
Ok(None)
}
}

106
src/context.rs Normal file
View File

@@ -0,0 +1,106 @@
//! Cooperative context switching, x86-64.
//!
//! Two naked-asm functions move execution between a scheduler thread and an
//! actor running on its own mmap'd stack. The compiler cannot do this; the
//! whole point of `#[unsafe(naked)]` is that we control every instruction.
//!
//! `SCHEDULER_SP` and `ACTOR_SP` are thread-locals holding each side's saved
//! stack pointer. `init_actor_stack` builds the initial stack so that the
//! first `switch_to_actor` lands inside the entry function with `rsp % 16 == 8`
//! (the x86-64 ABI requirement at function entry).
use std::cell::Cell;
thread_local! {
static SCHEDULER_SP: Cell<usize> = const { Cell::new(0) };
static ACTOR_SP: Cell<usize> = const { Cell::new(0) };
}
fn get_scheduler_sp() -> usize { SCHEDULER_SP.with(|c| c.get()) }
fn set_scheduler_sp(v: usize) { SCHEDULER_SP.with(|c| c.set(v)) }
pub fn get_actor_sp() -> usize { ACTOR_SP.with(|c| c.get()) }
pub fn set_actor_sp(v: usize) { ACTOR_SP.with(|c| c.set(v)) }
// ---------------------------------------------------------------------------
// Initial stack layout
//
// After alignment, sp = top & ~15 - 8. Then we push (downward) six callee-
// saved register slots and a return address. The first `switch_to_actor`
// pops r15..rbx and `ret`s — landing in `entry` with rsp % 16 == 8.
//
// Layout (high → low), relative to aligned_top = top & ~15:
// aligned_top - 8 : entry ptr ← `ret` target. Post-ret: rsp % 16 == 8.
// aligned_top - 16 : rbx = 0
// aligned_top - 24 : rbp = 0
// aligned_top - 32 : r12 = 0
// aligned_top - 40 : r13 = 0
// aligned_top - 48 : r14 = 0
// aligned_top - 56 : r15 = 0 ← initial rsp
// ---------------------------------------------------------------------------
pub fn init_actor_stack(top: *mut u8, entry: extern "C-unwind" fn()) -> usize {
unsafe {
let mut sp = (top as usize & !15) - 8;
sp -= 8; (sp as *mut usize).write(entry as usize); // ret target
sp -= 8; (sp as *mut usize).write(0); // rbx
sp -= 8; (sp as *mut usize).write(0); // rbp
sp -= 8; (sp as *mut usize).write(0); // r12
sp -= 8; (sp as *mut usize).write(0); // r13
sp -= 8; (sp as *mut usize).write(0); // r14
sp -= 8; (sp as *mut usize).write(0); // r15
sp
}
}
// ---------------------------------------------------------------------------
// Context switch shims
//
// Each shim:
// 1. Pushes the six callee-saved integer registers.
// 2. Snaps rsp into rdi and calls the Rust helper that stores it.
// 3. Calls the Rust helper that returns the *other* side's saved rsp.
// 4. Moves that into rsp.
// 5. Pops the six registers and rets.
//
// XMM registers are NOT saved here. We rely on every yield happening through
// a Rust call site, which means the compiler has spilled any live XMM state
// to the stack before we get here. (This is the same argument the compiler
// uses internally — callee-saved regs are what survive a `call`, and the
// SysV AMD64 ABI says XMM015 are all caller-saved.) If we ever yield from
// a place that isn't a Rust call boundary, this assumption breaks.
// ---------------------------------------------------------------------------
#[unsafe(naked)]
unsafe extern "C" fn switch_to_actor_asm() {
core::arch::naked_asm!(
"push rbx", "push rbp", "push r12", "push r13", "push r14", "push r15",
"mov rdi, rsp",
"call {set_sched_sp}",
"call {get_actor_sp}",
"mov rsp, rax",
"pop r15", "pop r14", "pop r13", "pop r12", "pop rbp", "pop rbx",
"ret",
set_sched_sp = sym set_scheduler_sp,
get_actor_sp = sym get_actor_sp,
);
}
/// Resume the actor whose sp is in `ACTOR_SP`. Returns when the actor yields.
pub unsafe fn switch_to_actor() {
unsafe { switch_to_actor_asm() };
}
#[unsafe(naked)]
pub unsafe extern "C" fn switch_to_scheduler() {
core::arch::naked_asm!(
"push rbx", "push rbp", "push r12", "push r13", "push r14", "push r15",
"mov rdi, rsp",
"call {set_actor_sp}",
"call {get_sched_sp}",
"mov rsp, rax",
"pop r15", "pop r14", "pop r13", "pop r12", "pop rbp", "pop rbx",
"ret",
set_actor_sp = sym set_actor_sp,
get_sched_sp = sym get_scheduler_sp,
);
}

520
src/io.rs Normal file
View File

@@ -0,0 +1,520 @@
//! Off-scheduler IO: blocking-work offload and epoll-based fd readiness.
//!
//! `block_on_io(closure)` runs `closure` on a dedicated worker OS thread,
//! parks the calling actor in the meantime, and returns the closure's
//! value when it completes. Lets actors call into blocking C libraries,
//! synchronous file IO, or anything else that doesn't fit the readiness
//! model.
//!
//! `wait_readable(fd)` / `wait_writable(fd)` register interest in an fd
//! with epoll and park the calling actor. When the fd becomes ready, the
//! epoll thread unparks the actor. The actual `read(2)`/`write(2)` syscall
//! runs back on the scheduler thread, *inside* the actor — buffer never
//! leaves the actor, no copying through an intermediary thread. Built on
//! these are the conveniences `read(fd, &mut buf)` and `write(fd, &buf)`.
//!
//! Architecture
//! ============
//! Per `run()`, two OS threads:
//! - **epoll thread**: owns the epollfd. Loops in `epoll_wait`. On a
//! ready fd, pushes `Completion::FdReady { pid, fd, events }` to the
//! shared completion queue and writes the scheduler-wake pipe. On the
//! shutdown pipe (also registered in epollfd), exits.
//! - **pool thread**: blocks on the request mpsc. Runs the closure
//! inside `catch_unwind`, pushes `Completion::Blocking { pid, result }`,
//! writes the scheduler-wake pipe.
//!
//! Both threads share a single `completions: Arc<Mutex<VecDeque<Completion>>>`
//! and the same scheduler-wake pipe.
//!
//! `epoll_ctl` (register/unregister fd interest) is called by the
//! scheduler thread *directly* on the epollfd. That's well-defined per
//! `epoll_ctl(2)`: a thread may be calling `epoll_wait` on the epollfd
//! while another thread calls `epoll_ctl`. Avoids needing a second mpsc
//! and a second wake mechanism.
//!
//! Epoll mode
//! ==========
//! Level-triggered with EPOLLONESHOT. After a wakeup the kernel
//! auto-disarms the fd, so we never get two wakeups for one
//! `wait_readable` call. The scheduler explicitly `EPOLL_CTL_DEL`s the fd
//! on completion to free the slot for re-registration. Net effect: each
//! `wait_readable(fd)` is one ADD, one wakeup, one DEL — symmetric and
//! stateless between calls.
//!
//! Fd hygiene
//! ==========
//! If an actor dies while waiting on an fd, the registration is leaked
//! (the fd stays in the epollfd, armed). EPOLLONESHOT bounds the damage:
//! at most one stale wakeup, after which the kernel disarms. The stale
//! wakeup hits a dead pid in `waiters` and is dropped. Acceptable for v0.2;
//! a future pass should DEL on actor death.
//!
//! Buffers used with `read`/`write` should be on fds opened with
//! `O_NONBLOCK`. If they aren't, the syscall may block the scheduler
//! thread despite the readiness notification (the fd reporting readable
//! doesn't guarantee the syscall completes without blocking — e.g. a
//! signal could be delivered). Documented; not enforced.
//!
//! Panic handling
//! ==============
//! The pool worker runs the closure inside `catch_unwind` and ships either
//! the return value or the panic payload back to the scheduler.
//! `block_on_io` resumes the panic on the calling actor's stack, so the
//! actor's supervisor sees a real `Signal::Panic` as if the work had run
//! inline. Fd-wait primitives don't run user code on the IO thread, so
//! they have no equivalent panic-propagation path.
use crate::pid::Pid;
use std::any::Any;
use std::collections::{HashMap, VecDeque};
use std::io;
use std::os::fd::RawFd;
use std::panic;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle as OsJoinHandle;
// ---------------------------------------------------------------------------
// Wire types
// ---------------------------------------------------------------------------
/// What the pool stores while computing a result. `Ok` is the closure's
/// return value (boxed as `Any`); `Err` is the panic payload.
pub type IoResult = Result<Box<dyn Any + Send>, Box<dyn Any + Send>>;
struct Request {
pid: Pid,
/// The work to perform. Returns the wire-form result directly.
work: Box<dyn FnOnce() -> IoResult + Send>,
}
/// Completion message from either IO thread back to the scheduler.
pub enum Completion {
/// A `block_on_io` closure has finished (Ok = return value, Err = panic
/// payload).
Blocking { pid: Pid, result: IoResult },
/// An fd registered via `wait_readable`/`wait_writable` is ready. The
/// scheduler looks up the parked pid in `waiters`, unparks it, and
/// removes the entry. `pid` isn't in this variant because the epoll
/// thread doesn't have access to the `waiters` map; the scheduler
/// thread owns that.
FdReady { fd: RawFd, events: u32 },
}
// ---------------------------------------------------------------------------
// IoThread — created per `run()`, owned by `SchedulerState`.
// ---------------------------------------------------------------------------
pub struct IoThread {
// ----- Channels & queues -----
/// Submission queue into the blocking-work pool.
tx: mpsc::Sender<Request>,
/// Shared completion queue, fed by both the pool and the epoll thread.
completions: Arc<Mutex<VecDeque<Completion>>>,
/// Pipe the scheduler polls in its idle path. Both IO threads write to
/// `wake_write` after pushing a completion.
wake_read: RawFd,
wake_write: RawFd,
// ----- Epoll machinery -----
/// The epollfd, owned by `IoThread`. Callable cross-thread via
/// `epoll_ctl` per the man page.
epollfd: RawFd,
/// Pipe used to signal the epoll thread to exit. Registered inside the
/// epollfd so a single `epoll_wait` covers both fd readiness and
/// shutdown.
shutdown_read: RawFd,
shutdown_write: RawFd,
/// One parked actor per registered fd. Populated by `wait_readable` /
/// `wait_writable` and drained by the scheduler when a `FdReady`
/// completion is processed.
pub waiters: HashMap<RawFd, Pid>,
// ----- Threads -----
pool_thread: Option<OsJoinHandle<()>>,
epoll_thread: Option<OsJoinHandle<()>>,
/// Number of `block_on_io` requests in-flight. Used by the scheduler's
/// idle path to decide whether to wait on the pipe or exit. Fd waits
/// are not counted here; they're counted by `waiters.len()`.
pub outstanding: u32,
}
impl IoThread {
pub fn start() -> io::Result<Self> {
// Scheduler-facing wake pipe.
let (wake_read, wake_write) = make_pipe()?;
// Pool submission channel + shared completion queue.
let (tx, rx) = mpsc::channel::<Request>();
let completions: Arc<Mutex<VecDeque<Completion>>> =
Arc::new(Mutex::new(VecDeque::new()));
// Epoll machinery.
let epollfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
if epollfd < 0 {
// Best-effort fd cleanup before bailing.
unsafe {
libc::close(wake_read);
libc::close(wake_write);
}
return Err(io::Error::last_os_error());
}
let (shutdown_read, shutdown_write) = match make_pipe() {
Ok(p) => p,
Err(e) => {
unsafe {
libc::close(epollfd);
libc::close(wake_read);
libc::close(wake_write);
}
return Err(e);
}
};
// Register the shutdown pipe in epollfd. We use a sentinel `data`
// value to recognise shutdown events. RawFd values are non-negative,
// so u64::MAX is unambiguously not a real fd-data encoding.
let mut shutdown_ev = libc::epoll_event {
events: libc::EPOLLIN as u32,
u64: SHUTDOWN_EPOLL_TOKEN,
};
if unsafe {
libc::epoll_ctl(
epollfd,
libc::EPOLL_CTL_ADD,
shutdown_read,
&mut shutdown_ev as *mut _,
)
} < 0
{
let e = io::Error::last_os_error();
unsafe {
libc::close(epollfd);
libc::close(shutdown_read);
libc::close(shutdown_write);
libc::close(wake_read);
libc::close(wake_write);
}
return Err(e);
}
// Spawn pool thread.
let pool_comps = completions.clone();
let pool_thread = std::thread::Builder::new()
.name("smarm-io-pool".into())
.spawn(move || pool_loop(rx, pool_comps, wake_write))?;
// Spawn epoll thread.
let epoll_comps = completions.clone();
let epoll_thread = std::thread::Builder::new()
.name("smarm-io-epoll".into())
.spawn(move || epoll_loop(epollfd, epoll_comps, wake_write))?;
Ok(Self {
tx,
completions,
wake_read,
wake_write,
epollfd,
shutdown_read,
shutdown_write,
waiters: HashMap::new(),
pool_thread: Some(pool_thread),
epoll_thread: Some(epoll_thread),
outstanding: 0,
})
}
/// Hand a request to the pool. Increments `outstanding`.
pub fn submit(&mut self, pid: Pid, work: Box<dyn FnOnce() -> IoResult + Send>) {
self.outstanding += 1;
// Send can only fail if the pool has hung up, which only happens
// on shutdown. submit during shutdown is a bug.
self.tx
.send(Request { pid, work })
.expect("io pool hung up unexpectedly");
}
/// Drain every available completion. Caller (the scheduler) routes the
/// results and updates `outstanding` / `waiters` accordingly.
pub fn drain_completions(&mut self) -> Vec<Completion> {
let mut q = self.completions.lock().unwrap();
let mut out = Vec::with_capacity(q.len());
while let Some(c) = q.pop_front() {
out.push(c);
}
out
}
pub fn wake_fd(&self) -> RawFd {
self.wake_read
}
/// Register interest in `fd` becoming readable/writable; record `pid`
/// as the parked waiter. The epoll thread will push a `FdReady`
/// completion when the kernel signals.
///
/// EPOLLONESHOT: one wakeup per registration. The scheduler must
/// `epoll_del` on completion to free the slot for re-registration.
pub fn epoll_register(
&mut self,
fd: RawFd,
pid: Pid,
readable: bool,
writable: bool,
) -> io::Result<()> {
// Two actors waiting on the same fd would be a misuse: the kernel
// delivers exactly one EPOLLONESHOT wakeup, so the second waiter
// would hang. Reject up front.
if self.waiters.contains_key(&fd) {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"fd already has a parked waiter",
));
}
// Defensive cleanup: if a previous actor died while waiting on this
// fd, the kernel-side registration was leaked (we don't walk all
// waiters on actor death). A bare DEL is harmless if the fd isn't
// registered (ENOENT), and removes any leak.
unsafe {
libc::epoll_ctl(self.epollfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
}
let mut events: u32 = libc::EPOLLONESHOT as u32;
if readable {
events |= libc::EPOLLIN as u32;
}
if writable {
events |= libc::EPOLLOUT as u32;
}
let mut ev = libc::epoll_event {
events,
u64: fd as u64,
};
let r = unsafe {
libc::epoll_ctl(self.epollfd, libc::EPOLL_CTL_ADD, fd, &mut ev as *mut _)
};
if r < 0 {
return Err(io::Error::last_os_error());
}
self.waiters.insert(fd, pid);
Ok(())
}
/// Remove `fd` from the epollfd. Called by the scheduler after a
/// `FdReady` completion, so the next `wait_readable(fd)` can ADD again.
///
/// Does NOT touch `waiters` — that's the scheduler's bookkeeping; this
/// is purely the kernel-side cleanup.
pub fn epoll_deregister(&mut self, fd: RawFd) {
// EPOLL_CTL_DEL of an already-removed fd returns ENOENT; ignore.
unsafe {
libc::epoll_ctl(self.epollfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
}
}
}
impl Drop for IoThread {
fn drop(&mut self) {
// 1. Signal the epoll thread to exit by writing the shutdown pipe.
unsafe {
let buf: [u8; 1] = [0];
// Single byte; we don't care about EINTR retry here — worst
// case the epoll thread blocks until process exit, which is
// fine because we then close fds out from under it.
libc::write(self.shutdown_write, buf.as_ptr() as *const _, 1);
}
// 2. Hang up the pool's request channel so the pool thread exits.
let (dead_tx, _) = mpsc::channel::<Request>();
let real_tx = std::mem::replace(&mut self.tx, dead_tx);
drop(real_tx);
// 3. Join both threads.
if let Some(h) = self.epoll_thread.take() {
let _ = h.join();
}
if let Some(h) = self.pool_thread.take() {
let _ = h.join();
}
// 4. Close fds.
unsafe {
libc::close(self.epollfd);
libc::close(self.shutdown_read);
libc::close(self.shutdown_write);
libc::close(self.wake_read);
libc::close(self.wake_write);
}
}
}
/// Sentinel `epoll_event.u64` distinguishing the shutdown pipe from
/// registered actor fds. RawFd values fit in i32, so the high bits are
/// available for a marker; we use u64::MAX which can't be a valid fd.
const SHUTDOWN_EPOLL_TOKEN: u64 = u64::MAX;
// ---------------------------------------------------------------------------
// Pool loop
// ---------------------------------------------------------------------------
fn pool_loop(
rx: mpsc::Receiver<Request>,
completions: Arc<Mutex<VecDeque<Completion>>>,
wake_write: RawFd,
) {
while let Ok(Request { pid, work }) = rx.recv() {
let result: IoResult = match panic::catch_unwind(panic::AssertUnwindSafe(work)) {
Ok(r) => r,
Err(payload) => Err(payload),
};
completions
.lock()
.unwrap()
.push_back(Completion::Blocking { pid, result });
wake_scheduler(wake_write);
}
}
// ---------------------------------------------------------------------------
// Epoll loop
// ---------------------------------------------------------------------------
fn epoll_loop(
epollfd: RawFd,
completions: Arc<Mutex<VecDeque<Completion>>>,
wake_write: RawFd,
) {
// Buffer for epoll_wait. 64 is plenty for our scale; if a real load
// appears that needs more, this is a one-line change.
const MAX_EVENTS: usize = 64;
let mut events: [libc::epoll_event; MAX_EVENTS] = unsafe { std::mem::zeroed() };
loop {
let n = unsafe {
libc::epoll_wait(
epollfd,
events.as_mut_ptr(),
MAX_EVENTS as libc::c_int,
-1,
)
};
if n < 0 {
let e = unsafe { *libc::__errno_location() };
if e == libc::EINTR {
continue;
}
// Anything else here is a programming error (EBADF on epollfd
// after we've closed it from Drop — the close races with us).
// Treat as shutdown.
return;
}
let mut shutdown_requested = false;
let mut pushed_any = false;
{
let mut q = completions.lock().unwrap();
for ev in events.iter().take(n as usize) {
if ev.u64 == SHUTDOWN_EPOLL_TOKEN {
shutdown_requested = true;
continue;
}
let fd = ev.u64 as RawFd;
q.push_back(Completion::FdReady {
fd,
events: ev.events,
});
pushed_any = true;
}
}
if pushed_any {
wake_scheduler(wake_write);
}
if shutdown_requested {
return;
}
}
}
/// Write one byte to the scheduler's wake pipe. Retries on EINTR; ignores
/// EAGAIN (pipe full means there's already an outstanding wake we haven't
/// consumed yet, which is sufficient).
fn wake_scheduler(wake_write: RawFd) {
let buf: [u8; 1] = [0];
unsafe {
loop {
let n = libc::write(wake_write, buf.as_ptr() as *const _, 1);
if n < 0 {
let e = *libc::__errno_location();
if e == libc::EINTR {
continue;
}
}
break;
}
}
}
// ---------------------------------------------------------------------------
// Pipe helpers (unchanged from v0.2)
// ---------------------------------------------------------------------------
fn make_pipe() -> io::Result<(RawFd, RawFd)> {
let mut fds: [libc::c_int; 2] = [0; 2];
let r = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
if r != 0 {
return Err(io::Error::last_os_error());
}
Ok((fds[0], fds[1]))
}
/// Drain pending bytes from the wake pipe. The scheduler calls this after
/// a `poll` wakeup so the next idle call sees an empty pipe.
pub fn drain_wake_pipe(fd: RawFd) {
let mut buf = [0u8; 64];
loop {
let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
if n <= 0 {
break;
}
}
}
/// Block on `fd` for up to `timeout`, returning when either there's data
/// to read or the timeout elapses. `None` for `timeout` means wait forever.
pub fn poll_wake(fd: RawFd, timeout: Option<std::time::Duration>) {
let timeout_ms: libc::c_int = match timeout {
None => -1,
Some(d) => {
let ms = d.as_millis();
if ms > i32::MAX as u128 {
i32::MAX
} else {
ms as i32
}
}
};
let mut pfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
loop {
let r = unsafe { libc::poll(&mut pfd as *mut _, 1, timeout_ms) };
if r < 0 {
let e = unsafe { *libc::__errno_location() };
if e == libc::EINTR {
continue;
}
}
break;
}
}

60
src/lib.rs Normal file
View File

@@ -0,0 +1,60 @@
//! # smarm — Silly Marks Abstract Rust Machine
//!
//! Erlang-style green-thread actor concurrency for Rust.
//!
//! Multi-threaded: N scheduler OS threads (default: one per CPU) share a
//! single global run queue behind a `Mutex`. Actors communicate by sending
//! `Send` messages over channels; every actor has a supervisor. Synchronisation
//! primitives — `Mutex<T>` with mandatory lock timeouts, channel `recv`,
//! `sleep`, and epoll-backed `wait_readable`/`wait_writable` — all park the
//! green thread, never the OS thread.
//!
//! See `LOOM.md` for the design intent and the deferred-for-later list.
pub mod stack;
pub mod context;
pub mod preempt;
pub mod pid;
pub mod actor;
pub mod channel;
pub mod scheduler;
pub mod supervisor;
pub mod timer;
pub mod io;
pub mod mutex;
pub mod runtime;
pub mod trace;
// ---------------------------------------------------------------------------
// Global allocator
// ---------------------------------------------------------------------------
#[global_allocator]
static ALLOCATOR: preempt::PreemptingAllocator = preempt::PreemptingAllocator;
// ---------------------------------------------------------------------------
// Public API re-exports
// ---------------------------------------------------------------------------
pub use channel::{channel, Receiver, RecvError, Sender};
pub use mutex::{LockTimeout, Mutex, MutexGuard};
pub use pid::Pid;
pub use runtime::{init, Config, Runtime};
pub use scheduler::{
block_on_io, run, self_pid, sleep, spawn, spawn_under, wait_readable, wait_writable,
yield_now, JoinError, JoinHandle,
};
pub use supervisor::Signal;
// ---------------------------------------------------------------------------
// check!()
// ---------------------------------------------------------------------------
/// Voluntarily check whether this actor's timeslice has expired, yielding
/// if so.
#[macro_export]
macro_rules! check {
() => {
$crate::preempt::maybe_preempt()
};
}

248
src/mutex.rs Normal file
View File

@@ -0,0 +1,248 @@
//! Actor-aware mutex with mandatory timeout.
//!
//! `Mutex<T>` parks the calling *green* thread on contention rather than
//! blocking the OS thread. Every lock attempt is bounded by a timeout.
//!
//! Internals use `Arc<std::sync::Mutex<...>>` so the type is genuinely
//! `Send + Sync` and can be shared across scheduler threads.
//!
//! Fairness: FIFO. Poisoning: none. Reentrance: deadlock (caller bug).
use crate::pid::Pid;
use crate::scheduler;
use crate::timer::{self, TimerTarget};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct LockTimeout;
impl std::fmt::Display for LockTimeout {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "mutex lock timed out")
}
}
impl std::error::Error for LockTimeout {}
// ---------------------------------------------------------------------------
// Internals
// ---------------------------------------------------------------------------
struct Wait {
pid: Pid,
seq: u64,
}
struct MutexState {
holder: Option<Pid>,
waiters: VecDeque<Wait>,
next_seq: u64,
default_timeout: Duration,
}
struct MutexCore {
state: StdMutex<MutexState>,
}
impl MutexCore {
fn new(default_timeout: Duration) -> Self {
Self {
state: StdMutex::new(MutexState {
holder: None,
waiters: VecDeque::new(),
next_seq: 0,
default_timeout,
}),
}
}
}
impl TimerTarget for MutexCore {
fn on_timeout(&self, pid: Pid, wait_seq: u64) {
let unpark = {
let mut st = self.state.lock().unwrap();
// Remove from waiters only if still there with matching seq.
// If the lock was already granted (holder == Some(pid)), the
// timer fired after the grant — treat as no-op; the actor
// will see `is_holder == true` and return Ok.
if st.holder == Some(pid) {
return;
}
let pos = st.waiters.iter().position(|w| w.pid == pid && w.seq == wait_seq);
if pos.is_some() {
st.waiters.remove(pos.unwrap());
true
} else {
false
}
};
if unpark {
scheduler::unpark(pid);
}
}
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
pub struct Mutex<T> {
core: Arc<MutexCore>,
/// Protected value. `None` while a guard is live; `Some` while free.
value: Arc<StdMutex<Option<T>>>,
}
impl<T> Mutex<T> {
pub fn new(value: T) -> Self {
Self {
core: Arc::new(MutexCore::new(DEFAULT_TIMEOUT)),
value: Arc::new(StdMutex::new(Some(value))),
}
}
pub fn set_default_timeout(&self, timeout: Duration) {
self.core.state.lock().unwrap().default_timeout = timeout;
}
pub fn lock(&self) -> Result<MutexGuard<'_, T>, LockTimeout> {
let timeout = self.core.state.lock().unwrap().default_timeout;
self.lock_timeout(timeout)
}
pub fn lock_timeout(&self, timeout: Duration) -> Result<MutexGuard<'_, T>, LockTimeout> {
// Outside the runtime (e.g. in tests, after run() returns) there is no
// current actor PID. Fall back to a blocking std::sync::Mutex acquire.
let Some(me) = crate::actor::current_pid() else {
return self.lock_blocking();
};
// Fast path: nobody holds it.
{
let mut st = self.core.state.lock().unwrap();
if st.holder.is_none() {
st.holder = Some(me);
drop(st);
let value = self.value.lock().unwrap().take()
.expect("Mutex: value missing on free fast path");
return Ok(MutexGuard { mutex: self, value: Some(value) });
}
}
// Slow path: register as a waiter, set timeout, park.
let _np = scheduler::NoPreempt::enter();
let seq = {
let mut st = self.core.state.lock().unwrap();
let seq = st.next_seq;
st.next_seq = st.next_seq.wrapping_add(1);
st.waiters.push_back(Wait { pid: me, seq });
seq
};
let target: Arc<dyn TimerTarget> = self.core.clone();
let deadline = timer::deadline_from_now(timeout);
scheduler::insert_wait_timer(deadline, me, target, seq);
scheduler::park_current();
// Resumed. Are we the holder?
let is_holder = self.core.state.lock().unwrap().holder == Some(me);
if is_holder {
let value = self.value.lock().unwrap().take()
.expect("Mutex: value missing after grant");
Ok(MutexGuard { mutex: self, value: Some(value) })
} else {
Err(LockTimeout)
}
}
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
let me = crate::actor::current_pid()?;
let mut st = self.core.state.lock().unwrap();
if st.holder.is_some() {
return None;
}
st.holder = Some(me);
drop(st);
let value = self.value.lock().unwrap().take()
.expect("Mutex: value missing on try_lock free path");
Some(MutexGuard { mutex: self, value: Some(value) })
}
/// Blocking fallback used when called outside the smarm runtime.
/// Spins on the internal std mutex; no actor parking, no timeout.
fn lock_blocking(&self) -> Result<MutexGuard<'_, T>, LockTimeout> {
// We have no PID to register as holder, so we bypass the holder/waiter
// tracking and just grab the value mutex directly. This is safe because
// outside the runtime there are no green threads competing.
let value = loop {
let v = self.value.lock().unwrap().take();
if let Some(v) = v { break v; }
std::thread::yield_now();
};
Ok(MutexGuard { mutex: self, value: Some(value) })
}
}
impl<T> Clone for Mutex<T> {
fn clone(&self) -> Self {
Self { core: self.core.clone(), value: self.value.clone() }
}
}
// Genuinely Send + Sync now that internals are Arc<std::sync::Mutex<...>>.
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
// ---------------------------------------------------------------------------
// Guard
// ---------------------------------------------------------------------------
pub struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
value: Option<T>,
}
impl<T> std::ops::Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T { self.value.as_ref().expect("MutexGuard: value missing") }
}
impl<T> std::ops::DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.value.as_mut().expect("MutexGuard: value missing")
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for MutexGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("MutexGuard")
.field(self.value.as_ref().expect("MutexGuard: value missing"))
.finish()
}
}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
let v = self.value.take().expect("MutexGuard: double drop");
*self.mutex.value.lock().unwrap() = Some(v);
let next_pid = {
let mut st = self.mutex.core.state.lock().unwrap();
match st.waiters.pop_front() {
Some(w) => {
st.holder = Some(w.pid);
Some(w.pid)
}
None => {
st.holder = None;
None
}
}
};
if let Some(pid) = next_pid {
scheduler::unpark(pid);
}
}
}

38
src/pid.rs Normal file
View File

@@ -0,0 +1,38 @@
//! Process identifiers.
//!
//! A `Pid` is `(index, generation)`. The index is a slot in the scheduler's
//! actor table; the generation increments every time that slot is reused.
//! A stale `Pid` (correct index, wrong generation) is a detectable error,
//! not a silent misdirection — solves the ABA problem without exhausting
//! the PID space.
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
pub struct Pid {
index: u32,
generation: u32,
}
impl Pid {
#[inline]
pub const fn new(index: u32, generation: u32) -> Self {
Self { index, generation }
}
#[inline]
pub const fn index(self) -> u32 { self.index }
#[inline]
pub const fn generation(self) -> u32 { self.generation }
}
impl std::fmt::Debug for Pid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Pid({}.{})", self.index, self.generation)
}
}
impl std::fmt::Display for Pid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<{}.{}>", self.index, self.generation)
}
}

129
src/preempt.rs Normal file
View File

@@ -0,0 +1,129 @@
//! Preemption hooks.
//!
//! Preemption is event-driven: every preemption event decrements a
//! thread-local counter (`ALLOC_COUNT`). When the counter hits zero, we
//! read RDTSC and, if the actor's timeslice has expired, call
//! `switch_to_scheduler` to yield. Resetting the counter to `ALLOC_INTERVAL`
//! amortises the RDTSC across many cheap events.
//!
//! Two event sources today:
//! - `PreemptingAllocator` — heap allocations.
//! - `smarm::check!()` — explicit preemption point for tight no-alloc
//! loops, since stable Rust gives us no transparent way to preempt
//! such loops (`__rust_probestack` is emitted inline by LLVM and not
//! called at runtime).
//!
//! Both sources share `ALLOC_COUNT`, so the timeslice check fires at the
//! same rate regardless of whether the actor is alloc-heavy, check-heavy,
//! or mixed.
//!
//! All state is thread-local. The scheduler enables preemption on resume
//! and disables it on the return path, so the scheduler can never preempt
//! itself.
//!
//! TSC frequency is machine-dependent; `TIMESLICE_CYCLES` is a constant
//! calibrated for ~100µs on a 3 GHz CPU. A real implementation would
//! measure it at startup. For v0.1 the constant suffices.
use std::alloc::{GlobalAlloc, Layout, System};
use std::cell::Cell;
const ALLOC_INTERVAL: u32 = 128;
const TIMESLICE_CYCLES: u64 = 300_000; // ≈ 100µs on a 3 GHz CPU
thread_local! {
/// While `false`, the allocator hook is a no-op.
pub static PREEMPTION_ENABLED: Cell<bool> = const { Cell::new(false) };
/// Countdown to next RDTSC check. Reset to `ALLOC_INTERVAL` on resume.
static ALLOC_COUNT: Cell<u32> = const { Cell::new(ALLOC_INTERVAL) };
/// RDTSC value written by the scheduler on every actor resume.
static TIMESLICE_START: Cell<u64> = const { Cell::new(0) };
}
/// Arm the timeslice. Called by the scheduler on every resume.
pub fn reset_timeslice() {
ALLOC_COUNT.with(|c| c.set(ALLOC_INTERVAL));
TIMESLICE_START.with(|c| c.set(rdtsc()));
}
#[inline(always)]
pub fn rdtsc() -> u64 {
unsafe {
// SAFETY: x86-64 only. `lfence` serialises the instruction stream so
// we don't measure time before prior instructions retire.
core::arch::asm!("lfence", options(nostack, nomem, preserves_flags));
core::arch::x86_64::_rdtsc()
}
}
pub struct PreemptingAllocator;
unsafe impl GlobalAlloc for PreemptingAllocator {
#[inline]
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
maybe_preempt();
unsafe { System.alloc(layout) }
}
#[inline]
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) }
}
#[inline]
unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
maybe_preempt();
unsafe { System.alloc_zeroed(layout) }
}
#[inline]
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
maybe_preempt();
unsafe { System.realloc(ptr, layout, new_size) }
}
}
/// Shared preemption check. Called by every preemption event source — the
/// heap allocator today, `smarm::check!()` for tight no-alloc loops.
/// Decrements `ALLOC_COUNT`; every `ALLOC_INTERVAL` calls reads the
/// timeslice clock and yields if expired.
///
/// **Invariant**: must not be called inside a "prep-to-park" region —
/// e.g. between registering as a channel's parked receiver and calling
/// `park_current()`. A preemption-driven yield in that window would
/// reach the scheduler with state=Runnable, the unparker would no-op,
/// the actor would then park, and the wakeup would be lost. Library
/// code that touches the parking primitives must keep its prep-to-park
/// regions allocation-free and check!()-free.
#[inline(always)]
pub fn maybe_preempt() {
ALLOC_COUNT.with(|c| {
let n = c.get();
if n == 0 {
c.set(ALLOC_INTERVAL);
if PREEMPTION_ENABLED.with(|e| e.get()) {
let start = TIMESLICE_START.with(|s| s.get());
if rdtsc().saturating_sub(start) > TIMESLICE_CYCLES {
// SAFETY: reachable only inside an actor (the scheduler
// sets PREEMPTION_ENABLED on resume and clears it on
// return). The scheduler stack is therefore valid.
unsafe { crate::context::switch_to_scheduler() };
}
}
} else {
c.set(n - 1);
}
});
}
// ---------------------------------------------------------------------------
// Test helpers
// ---------------------------------------------------------------------------
/// Force-expire the timeslice so the next RDTSC check preempts.
pub fn expire_timeslice_for_test() {
TIMESLICE_START.with(|c| c.set(0));
ALLOC_COUNT.with(|c| c.set(0));
}

718
src/runtime.rs Normal file
View File

@@ -0,0 +1,718 @@
//! Multi-scheduler runtime: configuration, initialisation, and the shared
//! state that all scheduler OS threads operate against.
//!
//! # Architecture
//!
//! ```text
//! init(Config) → Runtime (Arc<RuntimeInner>)
//!
//! RuntimeInner {
//! shared: Mutex<SharedState> ← slot table, run queue, timers, IO
//! stats: Vec<SchedulerStats> ← one per thread, lockless atomics (RFC 000)
//! io_parked: AtomicU32 ← actors parked on IO
//! sleeping: AtomicU32 ← actors parked on timer
//! }
//! ```
//!
//! `Runtime::run(f)` spawns N OS threads (one per `Config::resolved_thread_count()`),
//! each running `schedule_loop`. It blocks until all scheduler threads exit,
//! i.e. until the run queue is empty and nothing is pending.
//!
//! Each scheduler thread holds an `Arc<RuntimeInner>` clone. Per-thread
//! identity is a small integer index, stored in a thread-local, used to index
//! into `stats`.
//!
//! # Timer / IO drain (try-lock, one-winner)
//!
//! On each loop iteration every scheduler thread tries `try_lock()` on a
//! separate `drain_lock: Mutex<()>`. The winner drains due timers and IO
//! completions; losers skip and move straight to popping an actor from the
//! run queue. This is the simplest correct approach; revisit if the drain
//! becomes a measured bottleneck.
use crate::actor::{
clear_current_pid, current_pid, is_actor_done, reset_actor_done,
set_current_actor_box, set_current_pid, take_last_outcome, Actor, Outcome,
};
use crate::channel::Sender;
use crate::context::{get_actor_sp, set_actor_sp, switch_to_actor};
use crate::io::IoThread;
use crate::pid::Pid;
use crate::preempt::PREEMPTION_ENABLED;
use crate::supervisor::Signal;
use crate::timer::Timers;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
// ---------------------------------------------------------------------------
// Config
// ---------------------------------------------------------------------------
/// Runtime configuration.
///
/// ```
/// use smarm::runtime::Config;
///
/// // Use all available CPUs (default):
/// let c = Config::default();
///
/// // Exactly 4 scheduler threads:
/// let c = Config::exact(4);
///
/// // Between 2 and 8, clamped to available parallelism:
/// let c = Config::new(2, 8, None);
/// ```
#[derive(Clone, Debug)]
pub struct Config {
min: usize,
max: usize,
exact: Option<usize>,
}
impl Config {
/// Exact thread count; takes precedence over min/max.
pub fn exact(n: usize) -> Self {
assert!(n >= 1, "scheduler thread count must be ≥ 1");
Self { min: n, max: n, exact: Some(n) }
}
/// Bounded range. Thread count = clamp(available_parallelism, min, max).
pub fn new(min: usize, max: usize, exact: Option<usize>) -> Self {
assert!(min >= 1, "min must be ≥ 1");
assert!(max >= min, "max must be ≥ min");
if let Some(e) = exact {
assert!(e >= 1, "exact must be ≥ 1");
}
Self { min, max, exact }
}
/// The number of scheduler threads this config resolves to.
pub fn resolved_thread_count(&self) -> usize {
if let Some(e) = self.exact {
return e;
}
let avail = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
avail.clamp(self.min, self.max)
}
}
impl Default for Config {
fn default() -> Self {
let avail = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
Self { min: 1, max: avail, exact: None }
}
}
// ---------------------------------------------------------------------------
// Per-thread stats (RFC 000 Layer 1 primitives)
// ---------------------------------------------------------------------------
/// Lockless per-scheduler-thread counters. Written only by the owning thread;
/// readable from any thread (introspection actor, tests).
pub struct SchedulerStats {
/// PID index of the actor currently on-CPU, or `u32::MAX` when idle.
pub current_pid_index: AtomicU32,
/// Snapshot of run queue length maintained on every push/pop.
pub run_queue_len: AtomicU64,
}
impl SchedulerStats {
fn new() -> Self {
Self {
current_pid_index: AtomicU32::new(u32::MAX),
run_queue_len: AtomicU64::new(0),
}
}
}
// ---------------------------------------------------------------------------
// Runtime stats snapshot (for tests / introspection)
// ---------------------------------------------------------------------------
pub struct RuntimeStats {
pub(crate) inner: Arc<RuntimeInner>,
}
impl RuntimeStats {
/// Sum of run queue lengths across all scheduler threads.
pub fn total_run_queue_len(&self) -> u64 {
self.inner.stats.iter()
.map(|s| s.run_queue_len.load(Ordering::Relaxed))
.sum()
}
/// Number of scheduler threads.
pub fn scheduler_count(&self) -> usize {
self.inner.stats.len()
}
/// Actors currently parked on IO.
pub fn io_parked_count(&self) -> u32 {
self.inner.io_parked.load(Ordering::Relaxed)
}
/// Actors currently sleeping on a timer.
pub fn sleeping_count(&self) -> u32 {
self.inner.sleeping.load(Ordering::Relaxed)
}
}
// ---------------------------------------------------------------------------
// Shared state (behind Mutex<>)
// ---------------------------------------------------------------------------
pub(crate) const ACTOR_STACK_SIZE: usize = 64 * 1024;
#[derive(Debug)]
pub(crate) enum State { Runnable, Parked, Done }
pub(crate) struct Slot {
pub(crate) generation: u32,
pub(crate) actor: Option<Actor>,
pub(crate) state: State,
pub(crate) waiters: Vec<Pid>,
pub(crate) outcome: Option<Outcome>,
pub(crate) supervisor_channel: Option<Sender<Signal>>,
pub(crate) outstanding_handles: u32,
pub(crate) pending_io_result: Option<crate::io::IoResult>,
/// Set by `unpark()` when the actor is still running (not yet Parked).
/// The scheduler checks this after a Park yield and re-queues instead
/// of sleeping, closing the lost-wakeup window.
pub(crate) pending_unpark: bool,
}
impl Slot {
fn vacant() -> Self {
Self {
generation: 0,
actor: None,
state: State::Done,
waiters: Vec::new(),
outcome: None,
supervisor_channel: None,
outstanding_handles: 0,
pending_io_result: None,
pending_unpark: false,
}
}
}
pub(crate) type Closure = Box<dyn FnOnce() + Send>;
pub(crate) struct SharedState {
pub(crate) slots: Vec<Slot>,
pub(crate) free_list: Vec<u32>,
pub(crate) run_queue: VecDeque<Pid>,
pub(crate) root_pid: Option<Pid>,
pub(crate) timers: Timers,
pub(crate) io: Option<IoThread>,
/// Closures awaiting their first resume, keyed by Pid.
pub(crate) pending_closures: Vec<(Pid, Closure)>,
}
impl SharedState {
fn new() -> Self {
Self {
slots: Vec::new(),
free_list: Vec::new(),
run_queue: VecDeque::new(),
root_pid: None,
timers: Timers::new(),
io: None,
pending_closures: Vec::new(),
}
}
pub(crate) fn allocate_slot(&mut self) -> (u32, u32) {
if let Some(idx) = self.free_list.pop() {
let gen = self.slots[idx as usize].generation;
(idx, gen)
} else {
let idx = self.slots.len() as u32;
self.slots.push(Slot::vacant());
(idx, 0)
}
}
pub(crate) fn slot(&self, pid: Pid) -> Option<&Slot> {
let s = self.slots.get(pid.index() as usize)?;
if s.generation == pid.generation() { Some(s) } else { None }
}
pub(crate) fn slot_mut(&mut self, pid: Pid) -> Option<&mut Slot> {
let s = self.slots.get_mut(pid.index() as usize)?;
if s.generation == pid.generation() { Some(s) } else { None }
}
pub(crate) fn pop_pending_closure(&mut self, pid: Pid) -> Option<Closure> {
let pos = self.pending_closures.iter().position(|(p, _)| *p == pid)?;
Some(self.pending_closures.swap_remove(pos).1)
}
}
// ---------------------------------------------------------------------------
// RuntimeInner — the shared core behind an Arc
// ---------------------------------------------------------------------------
pub(crate) struct RuntimeInner {
pub(crate) shared: Mutex<SharedState>,
/// Try-lock: exactly one scheduler thread drains timers/IO per iteration.
drain_lock: Mutex<()>,
/// Per-thread stats, indexed by scheduler thread slot (0..N).
pub(crate) stats: Vec<SchedulerStats>,
/// Global counters for RFC 000 primitives.
pub(crate) io_parked: AtomicU32,
pub(crate) sleeping: AtomicU32,
}
impl RuntimeInner {
fn new(thread_count: usize) -> Arc<Self> {
let stats = (0..thread_count).map(|_| SchedulerStats::new()).collect();
Arc::new(Self {
shared: Mutex::new(SharedState::new()),
drain_lock: Mutex::new(()),
stats,
io_parked: AtomicU32::new(0),
sleeping: AtomicU32::new(0),
})
}
pub(crate) fn with_shared<R>(&self, f: impl FnOnce(&mut SharedState) -> R) -> R {
// Preemption must be off while we hold the shared mutex. If an actor
// called with_shared (e.g. from spawn, join, sleep) and the allocator
// fired maybe_preempt() while the lock was held, switch_to_scheduler()
// would context-switch to the scheduler loop, which would immediately
// deadlock trying to acquire the same mutex.
let prev = crate::preempt::PREEMPTION_ENABLED.with(|c| c.replace(false));
let result = f(&mut self.shared.lock().unwrap());
crate::preempt::PREEMPTION_ENABLED.with(|c| c.set(prev));
result
}
/// Returns `None` when the mutex is poisoned.
/// Used in `unpark` / channel Drop which can fire after teardown.
pub(crate) fn try_with_shared<R>(&self, f: impl FnOnce(&mut SharedState) -> R) -> Option<R> {
let prev = crate::preempt::PREEMPTION_ENABLED.with(|c| c.replace(false));
let result = match self.shared.lock() {
Ok(mut g) => Some(f(&mut g)),
Err(p) => Some(f(&mut p.into_inner())),
};
crate::preempt::PREEMPTION_ENABLED.with(|c| c.set(prev));
result
}
}
// ---------------------------------------------------------------------------
// Runtime — the public handle
// ---------------------------------------------------------------------------
pub struct Runtime {
inner: Arc<RuntimeInner>,
thread_count: usize,
}
/// Initialise the runtime with the given config. Returns a reusable handle.
pub fn init(config: Config) -> Runtime {
let n = config.resolved_thread_count();
Runtime {
inner: RuntimeInner::new(n),
thread_count: n,
}
}
impl Runtime {
/// Run `f` as the initial actor, block until all actors finish.
/// Can be called multiple times sequentially on the same `Runtime`.
pub fn run(&self, f: impl FnOnce() + Send + 'static) {
// Open the trace store for this run (no-op without smarm-trace).
#[cfg(feature = "smarm-trace")]
crate::trace::open();
// Re-initialise shared state for this run.
{
let mut s = self.inner.shared.lock().unwrap();
assert!(s.run_queue.is_empty(), "run() called while previous run still active");
s.root_pid = Some(ROOT_PID);
s.io = Some(IoThread::start().expect("failed to start IO thread"));
}
// Spawn the initial actor through the public spawn path (which
// requires a running runtime in the thread-local).
RUNTIME.with(|r| *r.borrow_mut() = Some(self.inner.clone()));
let initial_handle = crate::scheduler::spawn(f);
// Launch N-1 extra scheduler threads. The calling thread is thread 0.
let mut os_threads = Vec::new();
for slot in 1..self.thread_count {
let inner = self.inner.clone();
let t = thread::spawn(move || {
RUNTIME.with(|r| *r.borrow_mut() = Some(inner.clone()));
SCHED_SLOT.with(|s| s.set(slot));
schedule_loop(&inner, slot);
RUNTIME.with(|r| *r.borrow_mut() = None);
});
os_threads.push(t);
}
// Thread 0 runs the loop on the calling thread.
SCHED_SLOT.with(|s| s.set(0));
schedule_loop(&self.inner, 0);
// Wait for all other scheduler threads.
for t in os_threads {
let _ = t.join();
}
// Drop initial handle (decrements outstanding_handles count).
drop(initial_handle);
// Tear down IO and clean up shared state for the next run() call.
let mut s = self.inner.shared.lock().unwrap();
drop(s.io.take()); // joins IO threads
s.pending_closures.clear();
// Reset per-thread stats.
for stat in &self.inner.stats {
stat.current_pid_index.store(u32::MAX, Ordering::Relaxed);
stat.run_queue_len.store(0, Ordering::Relaxed);
}
self.inner.io_parked.store(0, Ordering::Relaxed);
self.inner.sleeping.store(0, Ordering::Relaxed);
RUNTIME.with(|r| *r.borrow_mut() = None);
// Flush trace to disk (no-op without smarm-trace).
#[cfg(feature = "smarm-trace")]
crate::trace::flush();
}
/// Snapshot of runtime statistics for introspection / tests.
pub fn stats(&self) -> RuntimeStats {
RuntimeStats { inner: self.inner.clone() }
}
}
// ---------------------------------------------------------------------------
// Thread-locals
// ---------------------------------------------------------------------------
use std::cell::{Cell, RefCell};
thread_local! {
/// The RuntimeInner for the current run(). Set by run() on the calling
/// thread and by each spawned scheduler thread.
pub(crate) static RUNTIME: RefCell<Option<Arc<RuntimeInner>>> =
const { RefCell::new(None) };
/// This scheduler thread's index into RuntimeInner::stats.
static SCHED_SLOT: Cell<usize> = const { Cell::new(0) };
/// What the actor wants when it yields back to the scheduler.
static YIELD_INTENT: Cell<YieldIntent> = const { Cell::new(YieldIntent::Yield) };
}
#[derive(Copy, Clone)]
pub(crate) enum YieldIntent { Yield, Park }
pub(crate) fn set_yield_intent(i: YieldIntent) {
YIELD_INTENT.with(|c| c.set(i));
}
// ---------------------------------------------------------------------------
// Sentinel root PID
// ---------------------------------------------------------------------------
pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX);
// ---------------------------------------------------------------------------
// Slot reclamation
// ---------------------------------------------------------------------------
pub(crate) fn reclaim_slot(s: &mut SharedState, pid: Pid) {
let idx = pid.index();
let slot = &mut s.slots[idx as usize];
slot.generation = slot.generation.wrapping_add(1);
slot.actor = None;
slot.outcome = None;
slot.waiters.clear();
slot.supervisor_channel = None;
slot.state = State::Done;
slot.outstanding_handles = 0;
slot.pending_unpark = false;
slot.pending_io_result = None;
s.free_list.push(idx);
}
// ---------------------------------------------------------------------------
// finalize_actor
// ---------------------------------------------------------------------------
fn finalize_actor(inner: &Arc<RuntimeInner>, pid: Pid, outcome: Outcome) {
let (joiner_outcome, sup_signal) = match outcome {
Outcome::Exit => (Outcome::Exit, Signal::Exit(pid)),
Outcome::Panic(payload) => (
Outcome::Panic(payload),
Signal::Panic(pid, Box::new(()) as Box<dyn std::any::Any + Send>),
),
};
let (waiters, supervisor_pid) = inner.with_shared(|s| {
let slot = s.slot_mut(pid).expect("finalize_actor: slot vanished");
let sup = slot.actor.as_ref().map(|a| a.supervisor);
slot.outcome = Some(joiner_outcome);
slot.state = State::Done;
slot.actor = None;
(std::mem::take(&mut slot.waiters), sup)
});
// Deliver to supervisor.
if let Some(sup) = supervisor_pid {
let sender = inner.with_shared(|s| {
s.slot(sup).and_then(|slot| slot.supervisor_channel.clone())
});
if let Some(sender) = sender {
let _ = sender.send(sup_signal);
}
}
// Unpark joiners.
for joiner in waiters {
crate::scheduler::unpark(joiner);
}
// Reclaim if no outstanding handles.
inner.with_shared(|s| {
let reclaim = s.slot(pid).map(|slot| slot.outstanding_handles == 0).unwrap_or(false);
if reclaim { reclaim_slot(s, pid); }
});
}
// ---------------------------------------------------------------------------
// schedule_loop — runs on each scheduler OS thread
// ---------------------------------------------------------------------------
fn schedule_loop(inner: &Arc<RuntimeInner>, slot: usize) {
let stats = &inner.stats[slot];
loop {
// ----------------------------------------------------------------
// 1. Try to win the drain lock (timers + IO). One winner per round;
// losers skip immediately and proceed to step 2.
// ----------------------------------------------------------------
if let Ok(_drain_guard) = inner.drain_lock.try_lock() {
let now = std::time::Instant::now();
// Drain due timers.
let due = inner.with_shared(|s| s.timers.pop_due(now));
for entry in due {
match entry.reason {
crate::timer::Reason::Sleep => {
inner.with_shared(|s| {
if let Some(slot) = s.slot_mut(entry.pid) {
if matches!(slot.state, State::Parked) {
slot.state = State::Runnable;
s.run_queue.push_back(entry.pid);
crate::te!(crate::trace::Event::Enqueue(entry.pid));
}
}
});
}
crate::timer::Reason::WaitTimeout { target, wait_seq } => {
// Runs outside with_shared — the callback may call unpark.
target.on_timeout(entry.pid, wait_seq);
}
}
}
// Drain IO completions.
let completions = inner.with_shared(|s| {
s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default()
});
for completion in completions {
match completion {
crate::io::Completion::Blocking { pid, result } => {
inner.with_shared(|s| {
if let Some(io) = s.io.as_mut() {
io.outstanding = io.outstanding.saturating_sub(1);
}
if let Some(slot) = s.slot_mut(pid) {
slot.pending_io_result = Some(result);
if matches!(slot.state, State::Parked) {
slot.state = State::Runnable;
s.run_queue.push_back(pid);
crate::te!(crate::trace::Event::Enqueue(pid));
}
}
});
}
crate::io::Completion::FdReady { fd, events: _ } => {
inner.with_shared(|s| {
let parked_pid = s.io.as_mut().and_then(|io| {
let pid = io.waiters.remove(&fd);
io.epoll_deregister(fd);
pid
});
if let Some(pid) = parked_pid {
if let Some(slot) = s.slot_mut(pid) {
if matches!(slot.state, State::Parked) {
slot.state = State::Runnable;
s.run_queue.push_back(pid);
crate::te!(crate::trace::Event::Enqueue(pid));
}
}
}
});
}
}
}
} // drain_guard drops here
// ----------------------------------------------------------------
// 2. Pop a runnable actor from the shared queue.
// ----------------------------------------------------------------
let pid = match inner.with_shared(|s| {
let len = s.run_queue.len() as u64;
stats.run_queue_len.store(len, Ordering::Relaxed);
s.run_queue.pop_front()
}) {
Some(p) => {
crate::te!(crate::trace::Event::Dequeue(p));
p
}
None => {
// Nothing runnable. Check whether we should wait or exit.
let (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) =
inner.with_shared(|s| {
let next = s.timers.peek_deadline();
let (out, fd) = match s.io.as_ref() {
Some(io) => (
io.outstanding + io.waiters.len() as u32,
Some(io.wake_fd()),
),
None => (0, None),
};
// Count actors that are not Done (Runnable or Parked).
let live = s.slots.iter().filter(|slot| {
slot.actor.is_some()
}).count();
(next, out, fd, s.run_queue.is_empty(), live)
});
match (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) {
// Queue is now non-empty (another thread added work): retry.
(_, _, _, false, _) => continue,
// Truly idle — no timers, no IO, no live actors.
(None, 0, _, true, 0) => return,
// Live actors but queue empty: they must be parked on IO or
// timers. Wait on the appropriate source.
(Some(deadline), _, fd_opt, true, _) => {
let now = std::time::Instant::now();
if deadline > now {
let timeout = deadline - now;
match fd_opt {
Some(fd) => {
crate::io::poll_wake(fd, Some(timeout));
crate::io::drain_wake_pipe(fd);
}
None => thread::sleep(timeout),
}
}
continue;
}
(None, _, Some(fd), true, _) => {
crate::io::poll_wake(fd, None);
crate::io::drain_wake_pipe(fd);
continue;
}
// Live actors, queue empty, no IO/timers: they're parked
// waiting for each other (potential deadlock in user code),
// or another thread is about to add work. Sleep briefly to
// avoid hammering the shared mutex.
_ => {
thread::sleep(std::time::Duration::from_micros(100));
continue;
}
}
}
};
// ----------------------------------------------------------------
// 3. Resume the actor.
// ----------------------------------------------------------------
let sp = match inner.with_shared(|s| {
s.slot(pid).and_then(|slot| slot.actor.as_ref().map(|a| a.sp))
}) {
Some(sp) => sp,
None => continue, // stale pid
};
// First resume: move the closure into the trampoline's thread-local.
if let Some(b) = inner.with_shared(|s| s.pop_pending_closure(pid)) {
set_current_actor_box(b);
}
// Update per-thread stats: record who's on-CPU.
stats.current_pid_index.store(pid.index(), Ordering::Relaxed);
set_actor_sp(sp);
set_current_pid(pid);
reset_actor_done();
YIELD_INTENT.with(|c| c.set(YieldIntent::Yield));
crate::preempt::reset_timeslice();
PREEMPTION_ENABLED.with(|c| c.set(true));
crate::te!(crate::trace::Event::Resume(pid));
unsafe { switch_to_actor() };
PREEMPTION_ENABLED.with(|c| c.set(false));
stats.current_pid_index.store(u32::MAX, Ordering::Relaxed);
clear_current_pid();
let intent = YIELD_INTENT.with(|c| c.get());
let new_sp = get_actor_sp();
if is_actor_done() {
crate::te!(crate::trace::Event::Done(pid));
let outcome = take_last_outcome().unwrap_or(Outcome::Exit);
finalize_actor(inner, pid, outcome);
} else {
inner.with_shared(|s| {
if let Some(slot) = s.slot_mut(pid) {
if let Some(actor) = slot.actor.as_mut() {
actor.sp = new_sp;
}
match intent {
YieldIntent::Yield => {
crate::te!(crate::trace::Event::Yield(pid));
slot.state = State::Runnable;
s.run_queue.push_back(pid);
crate::te!(crate::trace::Event::Enqueue(pid));
}
YieldIntent::Park => {
// Check if unpark() fired while the actor was
// still running (between registering in the
// channel and calling park_current). If so,
// re-queue immediately instead of parking.
if slot.pending_unpark {
slot.pending_unpark = false;
slot.state = State::Runnable;
s.run_queue.push_back(pid);
crate::te!(crate::trace::Event::UnparkFlagConsumed(pid));
crate::te!(crate::trace::Event::Enqueue(pid));
} else {
crate::te!(crate::trace::Event::Park(pid));
slot.state = State::Parked;
}
}
}
}
});
}
}
}

349
src/scheduler.rs Normal file
View File

@@ -0,0 +1,349 @@
//! Scheduler public API — thin façade over the multi-scheduler runtime.
//!
//! All heavy lifting lives in `runtime.rs`. This module exposes the same
//! surface that the rest of the codebase (channel, mutex, io, timer, actor)
//! calls into, plus the public API re-exported from `lib.rs`.
//!
//! The single-threaded `run()` entry point is kept as a convenience wrapper
//! around `runtime::init(Config::exact(1)).run(f)`.
use crate::actor::current_pid;
use crate::channel::Sender;
use crate::pid::Pid;
use crate::runtime::{
self, RuntimeInner, YieldIntent, ROOT_PID, RUNTIME,
};
use crate::supervisor::Signal;
use std::sync::Arc;
// ---------------------------------------------------------------------------
// with_runtime / try_with_runtime
// ---------------------------------------------------------------------------
/// Borrow the current runtime. Panics if called outside `Runtime::run()`.
pub(crate) fn with_runtime<R>(f: impl FnOnce(&Arc<RuntimeInner>) -> R) -> R {
RUNTIME.with(|r| {
let b = r.borrow();
let inner = b.as_ref().expect("smarm: not inside Runtime::run()");
f(inner)
})
}
/// Borrow the runtime if present; returns `None` otherwise.
/// Used on cleanup paths (channel Drop during teardown).
pub(crate) fn try_with_runtime<R>(f: impl FnOnce(&Arc<RuntimeInner>) -> R) -> Option<R> {
RUNTIME.with(|r| r.borrow().as_ref().map(|inner| f(inner)))
}
// ---------------------------------------------------------------------------
// JoinHandle / JoinError
// ---------------------------------------------------------------------------
#[derive(Debug)]
pub struct JoinError {
pub payload: Box<dyn std::any::Any + Send>,
}
pub struct JoinHandle {
pid: Pid,
consumed: bool,
}
impl JoinHandle {
pub fn pid(&self) -> Pid { self.pid }
pub fn join(mut self) -> Result<(), JoinError> {
use crate::actor::Outcome;
use crate::runtime::State; // need State visibility
let me = current_pid().expect("join() called outside an actor");
loop {
let outcome = with_runtime(|inner| {
inner.with_shared(|s| {
let slot = s.slot_mut(self.pid)
.expect("join: target slot has been reused");
if matches!(slot.state, State::Done) {
Some(slot.outcome.take().expect("Done slot must have outcome"))
} else {
slot.waiters.push(me);
None
}
})
});
match outcome {
Some(o) => {
self.consumed = true;
self.decrement_handle_count();
return match o {
Outcome::Exit => Ok(()),
Outcome::Panic(p) => Err(JoinError { payload: p }),
};
}
None => {
let _np = NoPreempt::enter();
park_current();
}
}
}
}
fn decrement_handle_count(&mut self) {
with_runtime(|inner| {
inner.with_shared(|s| {
let should_reclaim = match s.slot_mut(self.pid) {
Some(slot) => {
slot.outstanding_handles =
slot.outstanding_handles.saturating_sub(1);
matches!(slot.state, crate::runtime::State::Done)
&& slot.outstanding_handles == 0
}
None => false,
};
if should_reclaim {
crate::runtime::reclaim_slot(s, self.pid);
}
})
});
}
}
impl Drop for JoinHandle {
fn drop(&mut self) {
if !self.consumed {
// May be called outside run() if handle is dropped after teardown.
if try_with_runtime(|_| ()).is_some() {
self.decrement_handle_count();
}
}
}
}
// ---------------------------------------------------------------------------
// spawn / spawn_under / self_pid
// ---------------------------------------------------------------------------
pub fn spawn(f: impl FnOnce() + Send + 'static) -> JoinHandle {
let parent = current_pid()
.or_else(|| with_runtime(|inner| inner.with_shared(|s| s.root_pid)))
.expect("spawn() before run()");
spawn_under(parent, f)
}
pub fn spawn_under(supervisor: Pid, f: impl FnOnce() + Send + 'static) -> JoinHandle {
let pid = with_runtime(|inner| {
inner.with_shared(|s| {
let (idx, gen) = s.allocate_slot();
let pid = Pid::new(idx, gen);
let stack = crate::stack::Stack::new(crate::runtime::ACTOR_STACK_SIZE)
.expect("stack allocation failed");
let sp = init_actor_stack(stack.top(), crate::actor::trampoline);
let slot = &mut s.slots[idx as usize];
slot.actor = Some(crate::actor::Actor { pid, stack, sp, supervisor });
slot.state = crate::runtime::State::Runnable;
slot.outstanding_handles = 1;
slot.outcome = None;
slot.waiters.clear();
slot.supervisor_channel = None;
slot.pending_unpark = false;
slot.pending_io_result = None;
s.run_queue.push_back(pid);
s.pending_closures.push((pid, Box::new(f) as crate::runtime::Closure));
crate::te!(crate::trace::Event::Spawn { parent: supervisor, child: pid });
crate::te!(crate::trace::Event::Enqueue(pid));
pid
})
});
JoinHandle { pid, consumed: false }
}
use crate::context::init_actor_stack;
pub fn self_pid() -> Pid {
current_pid().expect("self_pid() called outside an actor")
}
// ---------------------------------------------------------------------------
// yield_now / park_current / unpark
// ---------------------------------------------------------------------------
pub fn yield_now() {
runtime::set_yield_intent(YieldIntent::Yield);
unsafe { crate::context::switch_to_scheduler() };
}
pub fn park_current() {
runtime::set_yield_intent(YieldIntent::Park);
unsafe { crate::context::switch_to_scheduler() };
}
pub fn unpark(pid: Pid) {
let result = try_with_runtime(|inner| {
inner.with_shared(|s| {
if let Some(slot) = s.slot_mut(pid) {
match slot.state {
crate::runtime::State::Parked => {
// Actor is suspended — safe to re-queue immediately.
slot.state = crate::runtime::State::Runnable;
s.run_queue.push_back(pid);
crate::te!(crate::trace::Event::UnparkDirect(pid));
crate::te!(crate::trace::Event::Enqueue(pid));
}
crate::runtime::State::Runnable => {
// Actor is still running (between registering its
// parked_receiver and calling park_current). Set the
// flag; the scheduler will re-queue after the Park
// yield instead of sleeping.
slot.pending_unpark = true;
crate::te!(crate::trace::Event::UnparkDeferred(pid));
}
crate::runtime::State::Done => {}
}
}
})
});
let _ = result;
}
// ---------------------------------------------------------------------------
// NoPreempt
// ---------------------------------------------------------------------------
pub struct NoPreempt(bool);
impl NoPreempt {
pub fn enter() -> Self {
let prev = crate::preempt::PREEMPTION_ENABLED.with(|c| c.replace(false));
NoPreempt(prev)
}
}
impl Drop for NoPreempt {
fn drop(&mut self) {
crate::preempt::PREEMPTION_ENABLED.with(|c| c.set(self.0));
}
}
// ---------------------------------------------------------------------------
// sleep / insert_wait_timer
// ---------------------------------------------------------------------------
pub fn sleep(duration: std::time::Duration) {
let me = current_pid().expect("sleep() called outside an actor");
let _np = NoPreempt::enter();
let deadline = crate::timer::deadline_from_now(duration);
with_runtime(|inner| inner.with_shared(|s| s.timers.insert_sleep(deadline, me)));
park_current();
}
pub fn insert_wait_timer(
deadline: std::time::Instant,
pid: Pid,
target: std::sync::Arc<dyn crate::timer::TimerTarget>,
wait_seq: u64,
) {
with_runtime(|inner| {
inner.with_shared(|s| {
s.timers.insert(
deadline,
pid,
crate::timer::Reason::WaitTimeout { target, wait_seq },
);
})
});
}
// ---------------------------------------------------------------------------
// block_on_io / wait_readable / wait_writable / read / write
// ---------------------------------------------------------------------------
pub fn block_on_io<F, T>(f: F) -> T
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let me = current_pid().expect("block_on_io() called outside an actor");
let work: Box<dyn FnOnce() -> crate::io::IoResult + Send> = Box::new(move || {
let v: T = f();
Ok(Box::new(v) as Box<dyn std::any::Any + Send>)
});
{
let _np = NoPreempt::enter();
with_runtime(|inner| inner.with_shared(|s| {
let io = s.io.as_mut().expect("io thread not started");
io.submit(me, work);
}));
park_current();
}
let result = with_runtime(|inner| inner.with_shared(|s| {
s.slot_mut(me)
.expect("block_on_io: own slot vanished")
.pending_io_result
.take()
.expect("block_on_io: resumed without a result")
}));
match result {
Ok(any) => *any.downcast::<T>().expect("block_on_io: type mismatch"),
Err(payload) => std::panic::resume_unwind(payload),
}
}
pub fn wait_readable(fd: std::os::fd::RawFd) -> std::io::Result<()> {
wait_fd(fd, true, false)
}
pub fn wait_writable(fd: std::os::fd::RawFd) -> std::io::Result<()> {
wait_fd(fd, false, true)
}
fn wait_fd(fd: std::os::fd::RawFd, readable: bool, writable: bool) -> std::io::Result<()> {
let me = current_pid().expect("wait_*() called outside an actor");
let _np = NoPreempt::enter();
with_runtime(|inner| inner.with_shared(|s| {
let io = s.io.as_mut().expect("io thread not started");
io.epoll_register(fd, me, readable, writable)
}))?;
park_current();
Ok(())
}
pub fn read(fd: std::os::fd::RawFd, buf: &mut [u8]) -> std::io::Result<usize> {
wait_readable(fd)?;
let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
if n < 0 { Err(std::io::Error::last_os_error()) } else { Ok(n as usize) }
}
pub fn write(fd: std::os::fd::RawFd, buf: &[u8]) -> std::io::Result<usize> {
wait_writable(fd)?;
let n = unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) };
if n < 0 { Err(std::io::Error::last_os_error()) } else { Ok(n as usize) }
}
// ---------------------------------------------------------------------------
// register_supervisor_channel
// ---------------------------------------------------------------------------
pub fn register_supervisor_channel(pid: Pid, sender: Sender<Signal>) {
with_runtime(|inner| inner.with_shared(|s| {
if let Some(slot) = s.slot_mut(pid) {
slot.supervisor_channel = Some(sender);
} else {
panic!("register_supervisor_channel: pid {:?} not found", pid);
}
}));
}
// ---------------------------------------------------------------------------
// Legacy run() — convenience wrapper
// ---------------------------------------------------------------------------
/// Single-threaded runtime entry point (backwards-compatible wrapper).
/// Equivalent to `runtime::init(Config::exact(1)).run(f)`.
pub fn run<F: FnOnce() + Send + 'static>(f: F) {
crate::runtime::init(crate::runtime::Config::exact(1)).run(f);
}

89
src/stack.rs Normal file
View File

@@ -0,0 +1,89 @@
//! mmap-based growable stack with a guard page below.
//!
//! Layout (low → high address):
//! [ guard page (PROT_NONE) | stack region ]
//! ^ top() — initial stack pointer
//!
//! Stacks grow downward. Overflow lands in the guard page → SIGSEGV.
use std::io;
pub struct Stack {
/// Bottom of the entire mmap'd region (start of guard page).
base: *mut u8,
/// Total mmap'd size: guard_size + stack_size.
total_size: usize,
/// Usable stack size (excluding guard page).
stack_size: usize,
}
// Stack owns its memory; safe to send across threads.
unsafe impl Send for Stack {}
impl Stack {
/// Allocate a new stack. `stack_size` is the usable region; one page is
/// added below as a guard page. Both are rounded up to the page size.
pub fn new(stack_size: usize) -> io::Result<Self> {
let page = page_size();
let stack_size = round_up(stack_size, page);
let guard_size = page;
let total_size = guard_size + stack_size;
let base = unsafe {
libc::mmap(
std::ptr::null_mut(),
total_size,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS,
-1,
0,
)
};
if base == libc::MAP_FAILED {
return Err(io::Error::last_os_error());
}
let base = base as *mut u8;
let ret = unsafe {
libc::mprotect(base as *mut libc::c_void, guard_size, libc::PROT_NONE)
};
if ret != 0 {
let err = io::Error::last_os_error();
unsafe { libc::munmap(base as *mut libc::c_void, total_size) };
return Err(err);
}
Ok(Self { base, total_size, stack_size })
}
/// 16-byte-aligned top of the usable region.
pub fn top(&self) -> *mut u8 {
let raw_top = self.base as usize + self.total_size;
(raw_top & !15) as *mut u8
}
/// Pointer to the bottom of the usable region (just above the guard page).
pub fn usable_base(&self) -> *mut u8 {
unsafe { self.base.add(page_size()) }
}
pub fn stack_size(&self) -> usize {
self.stack_size
}
}
impl Drop for Stack {
fn drop(&mut self) {
unsafe {
libc::munmap(self.base as *mut libc::c_void, self.total_size);
}
}
}
fn page_size() -> usize {
unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize }
}
fn round_up(n: usize, align: usize) -> usize {
(n + align - 1) & !(align - 1)
}

37
src/supervisor.rs Normal file
View File

@@ -0,0 +1,37 @@
//! Supervision signals.
//!
//! Every actor has a supervisor, which is itself just an actor with a
//! `Receiver<Signal>`. When a child actor terminates, the scheduler sends
//! a `Signal` on the supervisor's channel. The supervisor decides what to
//! do — restart, escalate, ignore.
//!
//! For v0.1 there is no built-in restart-intensity cap. That's policy and
//! lives in user code; library is mechanism only.
use crate::pid::Pid;
use std::any::Any;
pub enum Signal {
/// The child exited normally.
Exit(Pid),
/// The child panicked. Payload is whatever `panic!` was called with.
Panic(Pid, Box<dyn Any + Send>),
}
impl std::fmt::Debug for Signal {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Signal::Exit(pid) => write!(f, "Signal::Exit({:?})", pid),
Signal::Panic(pid, _) => write!(f, "Signal::Panic({:?}, ..)", pid),
}
}
}
impl Signal {
pub fn pid(&self) -> Pid {
match self {
Signal::Exit(p) => *p,
Signal::Panic(p, _) => *p,
}
}
}

147
src/timer.rs Normal file
View File

@@ -0,0 +1,147 @@
//! Sleep + wait-with-timeout timers.
//!
//! A min-heap of `(deadline, seq, reason)` entries lives on `SchedulerState`.
//! When an actor sleeps or starts a bounded wait (e.g. `mutex.lock()` with a
//! timeout), the runtime inserts an entry, marks the actor parked, and yields.
//! On every scheduler loop iteration the runtime pops all entries whose
//! deadline has passed and dispatches each according to its `Reason`:
//!
//! - `Sleep`: unpark the actor.
//! - `WaitTimeout`: call `on_timeout` on the registered target. The target
//! (e.g. a `Mutex`) decides whether the actor was actually still waiting
//! (timer fires first → unpark with error) or had already been granted
//! what it was waiting for (lock granted first → no-op).
//!
//! `BinaryHeap` is a max-heap; entries are wrapped in `Reverse` to get
//! min-heap behaviour.
//!
//! No cancellation. When a non-timer wakeup happens (e.g. lock granted
//! before timeout), the timer entry is left in the heap. It will be popped
//! eventually and the dispatch will observe "actor is no longer parked /
//! wait_seq is stale" and no-op. Cost is ~32 bytes per stale entry plus a
//! few cycles on pop; acceptable given the upper bound is "one entry per
//! parked actor".
//!
//! Stale pids (slot reused since the timer was inserted) are filtered on
//! pop by the scheduler — same convention as the run queue.
use crate::pid::Pid;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::sync::Arc;
use std::time::{Duration, Instant};
/// What to do when a timer entry's deadline arrives.
///
/// Held inside `Entry`, dispatched by the scheduler in `pop_due`.
pub enum Reason {
/// `loom::sleep(d)`. Unpark `pid` unconditionally (modulo the usual
/// "still parked?" check the scheduler applies).
Sleep,
/// A bounded wait — currently only `Mutex::lock_timeout`. On expiry the
/// scheduler calls `target.on_timeout(pid, wait_seq)`. The target then
/// decides whether `pid` was actually still waiting, and if so unparks
/// it with whatever error the wait was bounded for. `wait_seq` lets the
/// target tell apart "this wait" from "a later wait by the same actor
/// on the same target".
WaitTimeout {
target: Arc<dyn TimerTarget>,
wait_seq: u64,
},
}
/// Callback the scheduler invokes when a `WaitTimeout` entry pops.
///
/// Implementors: do not touch `SchedulerState` other than via the public
/// `unpark` / channel APIs. The scheduler is mid-iteration when this fires.
pub trait TimerTarget: Send + Sync {
fn on_timeout(&self, pid: Pid, wait_seq: u64);
}
pub struct Entry {
pub deadline: Instant,
/// Insertion order, used purely as a tiebreaker so `Entry: Ord` works
/// without having to compare the `Reason` payload (which contains an
/// `Rc<dyn TimerTarget>` and isn't `Ord`).
seq: u64,
pub pid: Pid,
pub reason: Reason,
}
impl PartialEq for Entry {
fn eq(&self, other: &Self) -> bool {
self.deadline == other.deadline && self.seq == other.seq
}
}
impl Eq for Entry {}
impl Ord for Entry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Earlier deadline first; ties broken by insertion order so the
// ordering is total. `Reason` and `Pid` deliberately don't
// participate.
self.deadline.cmp(&other.deadline).then_with(|| self.seq.cmp(&other.seq))
}
}
impl PartialOrd for Entry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Default)]
pub struct Timers {
/// Reverse-wrapped so the smallest deadline is at the top.
heap: BinaryHeap<Reverse<Entry>>,
/// Monotonic counter for the tiebreaker `seq` field.
next_seq: u64,
}
impl Timers {
pub fn new() -> Self {
Self { heap: BinaryHeap::new(), next_seq: 0 }
}
/// Insert a `Sleep` timer. Convenience for the common case.
pub fn insert_sleep(&mut self, deadline: Instant, pid: Pid) {
self.insert(deadline, pid, Reason::Sleep);
}
/// Insert an arbitrary timer entry.
pub fn insert(&mut self, deadline: Instant, pid: Pid, reason: Reason) {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
self.heap.push(Reverse(Entry { deadline, seq, pid, reason }));
}
pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}
/// Soonest pending deadline, or `None` if the heap is empty.
pub fn peek_deadline(&self) -> Option<Instant> {
self.heap.peek().map(|r| r.0.deadline)
}
/// Pop every entry whose deadline is ≤ `now`, in deadline order.
/// The scheduler dispatches each entry by inspecting `entry.reason`.
pub fn pop_due(&mut self, now: Instant) -> Vec<Entry> {
let mut out = Vec::new();
while let Some(r) = self.heap.peek() {
if r.0.deadline <= now {
out.push(self.heap.pop().unwrap().0);
} else {
break;
}
}
out
}
}
/// Wall-clock duration helper exposed for `sleep` and `lock_timeout`.
pub fn deadline_from_now(duration: Duration) -> Instant {
Instant::now()
.checked_add(duration)
.unwrap_or_else(Instant::now)
}

246
src/trace.rs Normal file
View File

@@ -0,0 +1,246 @@
//! Structured per-event tracing for smarm.
//!
//! Enabled by `--features smarm-trace`. Zero cost without the feature.
//!
//! Architecture: MPSC. Every scheduler thread holds a thread-local Sender
//! clone (one mutex acquire per thread, on first use). A dedicated drain
//! thread owns the Receiver, batches records, and writes to a BufWriter.
//! The hot path (record()) is a single channel send — no mutex, no disk I/O.
//!
//! Usage:
//! cargo test --test runtime <test_name> --features smarm-trace
//!
//! Output: smarm_trace.json in cwd, or $SMARM_TRACE_FILE.
//! View: https://ui.perfetto.dev or chrome://tracing
#[cfg(feature = "smarm-trace")]
#[macro_export]
macro_rules! te {
($kind:expr) => { $crate::trace::record($kind) };
}
#[cfg(not(feature = "smarm-trace"))]
#[macro_export]
macro_rules! te {
($kind:expr) => { () };
}
#[cfg(feature = "smarm-trace")]
pub use inner::*;
#[cfg(feature = "smarm-trace")]
mod inner {
use crate::pid::Pid;
use std::io::Write;
use std::sync::{mpsc, Mutex};
use std::time::Instant;
// -----------------------------------------------------------------------
// Event kinds
// -----------------------------------------------------------------------
#[derive(Clone, Debug)]
pub enum Event {
// Actor lifecycle
Spawn { parent: Pid, child: Pid },
Resume(Pid),
Yield(Pid),
Park(Pid),
Done(Pid),
// Wakeup paths
UnparkDirect(Pid), // unpark() saw Parked -> re-queued immediately
UnparkDeferred(Pid), // unpark() saw Runnable -> set pending_unpark flag
UnparkFlagConsumed(Pid), // scheduler saw flag on Park -> re-queued instead
// Channel
Send { sender: Pid, receiver: Option<Pid> },
RecvPark(Pid),
RecvWake(Pid),
// Queue
Enqueue(Pid),
Dequeue(Pid),
}
// -----------------------------------------------------------------------
// Wire format sent through the channel
// -----------------------------------------------------------------------
struct Record {
nanos: u64, // ns since open()
tid: u64, // OS thread id
event: Event,
}
// Sentinel: drain thread flushes and exits when it receives this.
enum Msg {
Event(Record),
Flush,
}
// -----------------------------------------------------------------------
// Global sender + start time
// -----------------------------------------------------------------------
struct Global {
sender: mpsc::Sender<Msg>,
start: Instant,
}
static GLOBAL: Mutex<Option<Global>> = Mutex::new(None);
// Per-thread state: cached Sender clone + cached copy of start Instant.
// The Sender clone is taken once per thread (one mutex hit).
// The start Instant is copied alongside it — also one mutex hit per thread.
// record() never touches GLOBAL after that.
struct LocalState {
tx: mpsc::Sender<Msg>,
start: Instant,
}
thread_local! {
static LOCAL_STATE: std::cell::RefCell<Option<LocalState>> =
std::cell::RefCell::new(None);
}
// -----------------------------------------------------------------------
// Lifecycle
// -----------------------------------------------------------------------
pub fn open() {
let path = std::env::var("SMARM_TRACE_FILE")
.unwrap_or_else(|_| "smarm_trace.json".to_owned());
let (tx, rx) = mpsc::channel::<Msg>();
let start = Instant::now();
*GLOBAL.lock().unwrap() = Some(Global { sender: tx, start });
// Drain thread: owns the Receiver, writes to disk.
let path_for_thread = path.clone();
std::thread::Builder::new()
.name("smarm-trace-drain".into())
.spawn(move || drain_thread(rx, &path_for_thread))
.expect("failed to spawn trace drain thread");
eprintln!("[smarm-trace] writing to {}", path);
}
/// Send a Flush sentinel and block until the drain thread finishes writing.
/// Called by Runtime::run after all scheduler threads have exited.
pub fn flush() {
// Drop the global sender so the drain thread's recv() returns Err
// after the Flush sentinel, signalling clean shutdown.
let sender = {
let mut g = GLOBAL.lock().unwrap();
g.take().map(|g| g.sender)
};
if let Some(tx) = sender {
let _ = tx.send(Msg::Flush);
// tx drops here — drain thread will see disconnected after Flush.
}
// Clear thread-local state.
LOCAL_STATE.with(|c| *c.borrow_mut() = None);
}
// -----------------------------------------------------------------------
// Hot path
// -----------------------------------------------------------------------
pub fn record(event: Event) {
// Disable preemption for the entire duration of record(). Any
// allocation here (mutex internals, channel send, lazy init) would
// trigger PreemptingAllocator -> maybe_preempt -> switch_to_scheduler,
// which would try to re-acquire inner.shared (already held at many
// te!() call sites) -> deadlock. Guard at the very top, before any
// allocation-capable call.
let was_enabled = crate::preempt::PREEMPTION_ENABLED
.with(|e| { let v = e.get(); e.set(false); v });
LOCAL_STATE.with(|cell| {
let mut opt = cell.borrow_mut();
// Lazily initialise: one mutex hit per thread, ever.
if opt.is_none() {
if let Some(g) = GLOBAL.lock().unwrap().as_ref() {
let tx = g.sender.clone();
*opt = Some(LocalState { tx, start: g.start });
}
}
if let Some(ls) = opt.as_ref() {
let nanos = ls.start.elapsed().as_nanos() as u64;
let tid = os_tid();
let _ = ls.tx.send(Msg::Event(Record { nanos, tid, event }));
}
});
crate::preempt::PREEMPTION_ENABLED.with(|e| e.set(was_enabled));
}
// -----------------------------------------------------------------------
// Drain thread
// -----------------------------------------------------------------------
fn drain_thread(rx: mpsc::Receiver<Msg>, path: &str) {
let f = match std::fs::File::create(path) {
Ok(f) => f,
Err(e) => { eprintln!("[smarm-trace] create failed: {}", e); return; }
};
let mut w = std::io::BufWriter::new(f);
let _ = writeln!(w, "{{\"traceEvents\":[");
let mut count: u64 = 0;
let mut first = true;
loop {
match rx.recv() {
Ok(Msg::Event(r)) => {
let (name, actor_idx) = chrome_fields(&r.event);
let ts_us = r.nanos as f64 / 1000.0;
if !first { let _ = w.write_all(b",\n"); }
first = false;
let _ = write!(w,
"{{\"ph\":\"i\",\"ts\":{:.3},\"pid\":{},\"tid\":{},\"name\":{:?},\"s\":\"g\"}}",
ts_us, actor_idx, r.tid, name);
count += 1;
}
Ok(Msg::Flush) | Err(_) => {
// Clean close.
let _ = writeln!(w, "\n]}}");
let _ = w.flush();
eprintln!("[smarm-trace] {} events written", count);
return;
}
}
}
}
// -----------------------------------------------------------------------
// Chrome trace helpers
// -----------------------------------------------------------------------
fn chrome_fields(ev: &Event) -> (String, u32) {
match ev {
Event::Spawn { parent, child } =>
(format!("spawn c={}", child.index()), parent.index()),
Event::Resume(p) => ("resume".into(), p.index()),
Event::Yield(p) => ("yield".into(), p.index()),
Event::Park(p) => ("park".into(), p.index()),
Event::Done(p) => ("done".into(), p.index()),
Event::UnparkDirect(p) => ("unpark_direct".into(), p.index()),
Event::UnparkDeferred(p) => ("unpark_deferred".into(), p.index()),
Event::UnparkFlagConsumed(p) => ("unpark_flag_consumed".into(), p.index()),
Event::Send { sender, receiver } => (
format!("send rx={}", receiver
.map(|p| p.index().to_string())
.unwrap_or_else(|| "none".into())),
sender.index(),
),
Event::RecvPark(p) => ("recv_park".into(), p.index()),
Event::RecvWake(p) => ("recv_wake".into(), p.index()),
Event::Enqueue(p) => ("enqueue".into(), p.index()),
Event::Dequeue(p) => ("dequeue".into(), p.index()),
}
}
fn os_tid() -> u64 {
unsafe { libc::syscall(libc::SYS_gettid) as u64 }
}
}