From 8cbef1dfc1d4145270653e7b560d1e51ee951b1b Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 16:09:29 +0000 Subject: [PATCH] feat: I/O and mutex support (v0.3) Add epoll-based non-blocking I/O and kernel-like mutexes: - src/io.rs: Complete epoll backend with timeout & error handling - src/mutex.rs: Fair mutex with waiter queues & parking integration - Enhanced scheduler to support synchronous I/O blocking - Comprehensive test suites for I/O (epoll) and mutex behavior - Documentation: LOOM.md concurrency model & README --- Cargo.toml | 3 +- LOOM.md | 210 ++++++++++++++++++++++ README.md | 84 +++++++++ src/io.rs | 445 +++++++++++++++++++++++++++++++++++++--------- src/lib.rs | 16 +- src/mutex.rs | 318 +++++++++++++++++++++++++++++++++ src/scheduler.rs | 262 +++++++++++++++++++++++---- src/timer.rs | 112 +++++++++--- tests/io_epoll.rs | 324 +++++++++++++++++++++++++++++++++ tests/mutex.rs | 311 ++++++++++++++++++++++++++++++++ tests/timer.rs | 93 ++++++++++ 11 files changed, 2032 insertions(+), 146 deletions(-) create mode 100644 LOOM.md create mode 100644 README.md create mode 100644 src/mutex.rs create mode 100644 tests/io_epoll.rs create mode 100644 tests/mutex.rs diff --git a/Cargo.toml b/Cargo.toml index 2028c22..50f0ef1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "smarm" -version = "0.1.0" +version = "0.3.0" edition = "2021" rust-version = "1.95" @@ -8,6 +8,7 @@ rust-version = "1.95" libc = "0.2" [dev-dependencies] +libc = "0.2" tokio = { version = "1", features = ["rt", "macros", "sync"] } [profile.dev] diff --git a/LOOM.md b/LOOM.md new file mode 100644 index 0000000..179143c --- /dev/null +++ b/LOOM.md @@ -0,0 +1,210 @@ +# Loom + +> Erlang-style actor concurrency for Rust, without the copies, the colors, or the GC pauses. + +--- + +## Vision + +Rust gives you the right ownership discipline for safe actor concurrency almost for free — `Send` already +draws the boundary, the borrow checker already enforces it. What it lacks is an execution model to match: +async/await is IO-centric, colors your functions, and trades stack simplicity for state-machine complexity; +OS threads are too heavy to spawn per actor. + +Loom adds a third option: **green-thread actors on a shared heap**, scheduled cooperatively, with +message-passing as the only cross-actor communication primitive. You get Erlang's isolation model without +Erlang's copying GC, and you get Rust's zero-copy ownership transfers without async's cognitive overhead. +No function coloring. No `Box`. Just actors, messages, and the borrow checker doing what it +already does. + +--- + +## Do: Core Runtime + +### Actors and scheduling + +Each actor is a lightweight green thread with its own heap-allocated, growable stack. Stacks are +allocated via `mmap` with a guard page below the region; overflow is detected by the OS without Loom +polling for it. Initial stacks are small and grow by remapping on demand. + +The scheduler runs one OS thread per CPU. Each scheduler thread loops against a single global +`Mutex` queue shared across all schedulers. If queue contention becomes a measured bottleneck +this can be revisited; the interface will not change. + +Loom requires `panic = unwind`. Users who set `panic = abort` accept that supervision and actor +isolation are silently degraded to process death. + +### Process descriptor + +Each actor has a descriptor that is hot while the actor runs and will typically live in L1 cache. +It holds: + +- `stack_base: *mut u8` — bottom of the allocated stack region +- `stack_cap: usize` — total allocated size +- `stack_ptr: *mut u8` — current stack pointer (`rsp`), saved on yield +- `pid: (u32, u32)` — index and generation counter (see PIDs below) +- `alloc_count: u32` — countdown for preemption sampling +- `timeslice_start: u64` — `RDTSC` value written on every resume +- `resize_count: u16` — diagnostic counter for stack growth events +- `context: *mut ContextSaveArea` — pointer to the register save area (cold, touched only on switch) + +### Context switching + +Context switching is implemented in a `#[naked]` assembly shim, one per supported architecture. +The compiler cannot be asked to switch stacks. + +**Suspend** (yield, preemption, or blocking): +1. Save callee-saved integer registers and SIMD registers into `ContextSaveArea`. +2. Save `rsp`/`sp` into the process descriptor. +3. Load the scheduler's stack pointer from a thread-local and jump back into the scheduler loop. + +**Resume**: +1. Load `rsp`/`sp` from the process descriptor. +2. Restore registers from `ContextSaveArea`. +3. `ret` — the return address is already on the restored stack, execution resumes exactly where the + actor yielded. + +**x86-64**: saves `rbx`, `rbp`, `r12`–`r15` (6 × 8 = 48 bytes) and `xmm0`–`xmm15` (16 × 16 = 256 +bytes) = 304 bytes total. Full SSE baseline is required; the compiler may autovectorise freely. +AVX-512 is deferred. + +**ARM64**: saves `x19`–`x30` (12 × 8 = 96 bytes, including the link register `x30` which must be +saved explicitly — it holds the return address, unlike x86 where `call` pushes it to the stack) and +`d8`–`d15` (8 × 8 = 64 bytes) = 160 bytes total. + +`ContextSaveArea` is a `Box` per actor. Lifetime equals the actor's lifetime; +no churn, no bulk deallocation, `Box` is correct. + +Initial platform target is x86-64 Linux. ARM64 and macOS are natural follow-ons. + +### Allocator-driven preemption + +Every Nth allocation, the allocator reads `RDTSC` and compares it against `timeslice_start`. If the +threshold is exceeded the actor yields. The workloads that starve a scheduler — sustained compute, +data transformation — are precisely the ones doing frequent allocations, so this approximation is +correct by construction. + +`RDTSC` is not monotonic across core migration; a slightly wrong timeslice is acceptable. Loom is +not a real-time scheduler. + +Known failure mode: tight no-alloc loops are invisible to this mechanism. Actors doing sustained +allocation-free compute must call `loom::yield_now()` explicitly, or offload to a thread pool +outside the actor scheduler (e.g. rayon). This is documented and acceptable — such loops are rare +in message-passing workloads. + +### Yield points + +An actor yields at: + +- **Channel send/recv** — the primary communication primitive +- **Mutex contention** — attempting to lock a held `Arc>` parks the actor +- **IO** — blocking on a socket or file descriptor parks the actor until the IO thread signals readiness +- **`loom::sleep(duration)`** — parks the actor; the timer wheel re-queues it on expiry +- **`loom::yield_now()`** — explicit cooperative yield +- **Allocator preemption** — as above +- **Spawn** — does not yield by default; the new actor is queued and the spawner continues + +`std::thread::sleep` inside an actor blocks the entire OS thread and should never be used. Loom +may emit a warning if it can detect this. + +### IO thread + +A single dedicated IO thread runs an `epoll`/`kqueue` loop. Actors blocking on IO register their +file descriptor and PID; the IO thread moves them back into the global queue when the fd is ready. +A `HashMap` maps fds to parked actors. Cancellation (actor dies while waiting on IO) +deregisters the fd. This is intentionally simple and not pluggable; Loom is not a general async +executor. + +### Communication + +Messages must be `Send` or `Copy`. Non-`Send` types cannot cross an actor boundary; this is +enforced by the type system with no runtime overhead. + +Two primitives only: + +- **Move** — transfer owned data across a channel. Zero copy. The sender relinquishes ownership + at the type level. This is the default. +- **`Arc>`** — for genuinely shared long-lived state. Explicit and visible. + +Cross-actor `Rc` or bare pointers are banned. There is no cycle detector. Cross-actor cycles are +banned by construction: either transfer ownership or use `Arc`. + +### PIDs + +A PID is a `(index, generation)` pair. The index may be reused after an actor dies; the generation +counter increments on every death. A stale handle holding the wrong generation is a detectable +error, not a silent misdirection. This avoids the ABA problem without reserving PID space forever. + +### Supervision + +Every actor has a supervisor, assigned at spawn. This is not optional. The root supervisor is +provided by the runtime; its death is a process exit. + +A supervisor receives one of three signals when a child actor terminates: + +- `Signal::Exit(pid)` — normal completion +- `Signal::Panic(pid, payload)` — caught via `catch_unwind` at the actor entry point boundary, + before unwinding can reach the assembly shim +- `Signal::Timeout(pid)` — actor exceeded a budget (see below) + +The supervisor decides: restart the actor, escalate to its own supervisor, or ignore. Restart +intensity is capped: if an actor panics more than N times within a time window, the supervisor +stops restarting and escalates. This prevents a bad prelude or corrupted input from spinning the +supervisor in a restart loop indefinitely. N and the window are configurable per supervisor with a +sensible global default. + +### Mutex timeout + +Every `loom::mutex` lock attempt is mediated by the scheduler. If the lock is not acquired within +a configurable timeout, the actor receives a `LockTimeout` error rather than parking forever. This +is a hard runtime guarantee, not a convention. Default timeout is global and configurable; +individual locks and individual call sites can override it. + +### Task joining + +Actors can spawn children and wait on a group of handles: + +```rust +let h1 = loom::spawn(|| compute_a()); +let h2 = loom::spawn(|| compute_b()); +let (a, b) = loom::join!(h1, h2); +``` + +`join!` parks the calling actor until all handles complete. The last child to finish re-queues the +parent. This is a countdown in the parent's descriptor; no polling, no waker registration. A +`join_timeout!` variant is a natural extension. + +### Timer wheel + +`loom::sleep` and supervision timeouts are driven by a timer wheel in the scheduler. Sleeping +actors are parked and re-queued by the timer thread on expiry. The timer wheel is internal +infrastructure; its design is an implementation detail. + +--- + +## Defer: Later Work + +- **Stack sizing policy** — initial size, growth factor, and whether stacks ever shrink are + implementation decisions to be made with profiling data, not up front. +- **Queue contention** — if `Mutex` proves to be a bottleneck under profiling, evaluate + `DashMap` or a lock-free work-stealing deque (e.g. `crossbeam-deque`). Not before. +- **AVX-512 context save** — extend `ContextSaveArea` when there is a concrete use case. +- **`loom::sleep` vs raw sleep semantics** — further control knobs deferred until the basic sleep + is working and real use cases are understood. +- **Supervision tree API** — the contract is defined; the recursive hierarchy, restart strategies, + and introspection API are implementation work. +- **no_std support** — the assembly shim is no_std friendly but the IO thread and allocator require + OS primitives. Target is no_std + `alloc` on hosted platforms; bare metal is out of scope. +- **Distribution** — Loom is a single-process runtime. No distribution protocol, no BEAM-style + clustering. + +--- + +## What Loom is Not + +- Not a drop-in replacement for Tokio. Loom does not implement `Future` or the async executor interface. +- Not a general allocator. Loom manages actor stacks; heap allocation for actor data goes through + the system allocator. +- Not Erlang. No hot code reloading, no distribution protocol, no BEAM bytecode. Loom is a + concurrency runtime, not a platform. +- Not a real-time scheduler. Timeslice accuracy is best-effort. diff --git a/README.md b/README.md new file mode 100644 index 0000000..0eb7fd1 --- /dev/null +++ b/README.md @@ -0,0 +1,84 @@ +# smarm + +> Silly Marks Abstract Rust Machine. A prototype green-thread actor runtime for Rust. + +Implements the core ideas in [`LOOM.md`](./LOOM.md): green-thread actors on a +shared heap, scheduled cooperatively, communicating only by `Send` messages. +Erlang's isolation model without Erlang's copying GC, Rust's zero-copy +ownership transfers without async's function colouring. + +This is single-threaded for now — one scheduler, one OS thread. Multi-threaded +scheduling is on the deferred list. The public API is shaped to match what the +multi-threaded version will expose, so the migration shouldn't require source +changes for downstream code. + +## What's here + +| Module | What it does | +|--------------|------------------------------------------------------------------------| +| `stack` | `mmap`'d growable stack with guard page; SIGSEGV on overflow | +| `context` | `#[naked]` x86-64 context-switch shims, callee-saved regs only | +| `preempt` | Allocator-driven preemption; `check!()` macro for no-alloc loops | +| `pid` | `(index, generation)` PIDs; stale handles are detectable, not silent | +| `actor` | Trampoline + `catch_unwind` boundary at the actor entry point | +| `scheduler` | Run queue, slot table, spawn/join, parking, idle path | +| `channel` | Unbounded MPSC channel; `recv` parks the actor | +| `mutex` | `Mutex` with mandatory timeout; FIFO waiters; parks the green thread | +| `timer` | Min-heap of `(deadline, reason)`; `Sleep` and `WaitTimeout` reasons | +| `io` | `block_on_io` for blocking work; `wait_readable`/`wait_writable` + `read`/`write` via epoll | +| `supervisor` | `Signal::Exit` / `Signal::Panic` delivered to a parent actor's mailbox | + +## Quick taste + +```rust +use smarm::{run, spawn, channel}; + +run(|| { + let (tx, rx) = channel::(); + let h = spawn(move || { + for _ in 0..3 { + let v = rx.recv().unwrap(); + println!("got {v}"); + } + }); + for v in 1..=3i64 { + tx.send(v).unwrap(); + } + h.join().unwrap(); +}); +``` + +## Layout + +``` +src/ + stack.rs context.rs preempt.rs pid.rs actor.rs + scheduler.rs channel.rs mutex.rs timer.rs io.rs supervisor.rs + lib.rs +tests/ + per-module integration tests +benches/ + primes.rs fan-out/fan-in compute, vs tokio current_thread +LOOM.md design intent +``` + +## Building and running + +Standard Cargo. Requires Rust 1.95 or newer (the `#[naked]` attribute went stable +in 1.88; we use a few unrelated post-1.88 features). x86-64 Linux only — +ARM64 and macOS are on the deferred list because of the assembly shim and the +epoll dependency. + +```sh +cargo test # all tests +cargo test --test mutex # one module +cargo bench # primes benchmark vs tokio +``` + +## What's not here + +See the **Defer** section of `LOOM.md`. Notable absences: multi-threaded +scheduler, supervisor restart-intensity caps, `join!` for handle groups, +stack growth via remap, hierarchical timer wheel, fd-wait timeouts, +`Signal::Timeout`. Each is mechanism we know how to add; none belongs in +this iteration. diff --git a/src/io.rs b/src/io.rs index 4286486..71a96d3 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,35 +1,73 @@ -//! Off-scheduler blocking work. +//! Off-scheduler IO: blocking-work offload and epoll-based fd readiness. //! //! `block_on_io(closure)` runs `closure` on a dedicated worker OS thread, //! parks the calling actor in the meantime, and returns the closure's //! value when it completes. Lets actors call into blocking C libraries, -//! synchronous file IO, or anything else that would otherwise stall the -//! scheduler thread. +//! synchronous file IO, or anything else that doesn't fit the readiness +//! model. +//! +//! `wait_readable(fd)` / `wait_writable(fd)` register interest in an fd +//! with epoll and park the calling actor. When the fd becomes ready, the +//! epoll thread unparks the actor. The actual `read(2)`/`write(2)` syscall +//! runs back on the scheduler thread, *inside* the actor — buffer never +//! leaves the actor, no copying through an intermediary thread. Built on +//! these are the conveniences `read(fd, &mut buf)` and `write(fd, &buf)`. //! //! Architecture //! ============ -//! Per `run()`: -//! - one worker OS thread, started by `run()` and joined at shutdown; -//! - a request channel (`mpsc::Sender`) from scheduler → worker; -//! - a completion queue (`Mutex>`) worker → scheduler; -//! - a wake pipe: when the worker pushes a completion it writes one byte -//! to the pipe; the scheduler polls the pipe (with timeout) when it -//! would otherwise be idle. +//! Per `run()`, two OS threads: +//! - **epoll thread**: owns the epollfd. Loops in `epoll_wait`. On a +//! ready fd, pushes `Completion::FdReady { pid, fd, events }` to the +//! shared completion queue and writes the scheduler-wake pipe. On the +//! shutdown pipe (also registered in epollfd), exits. +//! - **pool thread**: blocks on the request mpsc. Runs the closure +//! inside `catch_unwind`, pushes `Completion::Blocking { pid, result }`, +//! writes the scheduler-wake pipe. //! -//! For v0.2 the worker is a single thread, so concurrent `block_on_io` -//! calls are serialised. v0.3 can replace it with a thread pool behind -//! the same request channel. +//! Both threads share a single `completions: Arc>>` +//! and the same scheduler-wake pipe. +//! +//! `epoll_ctl` (register/unregister fd interest) is called by the +//! scheduler thread *directly* on the epollfd. That's well-defined per +//! `epoll_ctl(2)`: a thread may be calling `epoll_wait` on the epollfd +//! while another thread calls `epoll_ctl`. Avoids needing a second mpsc +//! and a second wake mechanism. +//! +//! Epoll mode +//! ========== +//! Level-triggered with EPOLLONESHOT. After a wakeup the kernel +//! auto-disarms the fd, so we never get two wakeups for one +//! `wait_readable` call. The scheduler explicitly `EPOLL_CTL_DEL`s the fd +//! on completion to free the slot for re-registration. Net effect: each +//! `wait_readable(fd)` is one ADD, one wakeup, one DEL — symmetric and +//! stateless between calls. +//! +//! Fd hygiene +//! ========== +//! If an actor dies while waiting on an fd, the registration is leaked +//! (the fd stays in the epollfd, armed). EPOLLONESHOT bounds the damage: +//! at most one stale wakeup, after which the kernel disarms. The stale +//! wakeup hits a dead pid in `waiters` and is dropped. Acceptable for v0.2; +//! a future pass should DEL on actor death. +//! +//! Buffers used with `read`/`write` should be on fds opened with +//! `O_NONBLOCK`. If they aren't, the syscall may block the scheduler +//! thread despite the readiness notification (the fd reporting readable +//! doesn't guarantee the syscall completes without blocking — e.g. a +//! signal could be delivered). Documented; not enforced. //! //! Panic handling //! ============== -//! The worker runs the closure inside `catch_unwind` and ships either the -//! return value or the panic payload back to the scheduler. `block_on_io` -//! resumes the panic on the calling actor's stack, so the actor's -//! supervisor sees a real `Signal::Panic` as if the work had run inline. +//! The pool worker runs the closure inside `catch_unwind` and ships either +//! the return value or the panic payload back to the scheduler. +//! `block_on_io` resumes the panic on the calling actor's stack, so the +//! actor's supervisor sees a real `Signal::Panic` as if the work had run +//! inline. Fd-wait primitives don't run user code on the IO thread, so +//! they have no equivalent panic-propagation path. use crate::pid::Pid; use std::any::Any; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::io; use std::os::fd::RawFd; use std::panic; @@ -41,7 +79,7 @@ use std::thread::JoinHandle as OsJoinHandle; // Wire types // --------------------------------------------------------------------------- -/// What the worker stores while computing a result. `Ok` is the closure's +/// What the pool stores while computing a result. `Ok` is the closure's /// return value (boxed as `Any`); `Err` is the panic payload. pub type IoResult = Result, Box>; @@ -51,9 +89,17 @@ struct Request { work: Box IoResult + Send>, } -struct Completion { - pid: Pid, - result: IoResult, +/// Completion message from either IO thread back to the scheduler. +pub enum Completion { + /// A `block_on_io` closure has finished (Ok = return value, Err = panic + /// payload). + Blocking { pid: Pid, result: IoResult }, + /// An fd registered via `wait_readable`/`wait_writable` is ready. The + /// scheduler looks up the parked pid in `waiters`, unparks it, and + /// removes the entry. `pid` isn't in this variant because the epoll + /// thread doesn't have access to the `waiters` map; the scheduler + /// thread owns that. + FdReady { fd: RawFd, events: u32 }, } // --------------------------------------------------------------------------- @@ -61,61 +107,146 @@ struct Completion { // --------------------------------------------------------------------------- pub struct IoThread { - /// Channel into the worker. + // ----- Channels & queues ----- + + /// Submission queue into the blocking-work pool. tx: mpsc::Sender, - /// Shared completion queue. The worker pushes; the scheduler drains. + /// Shared completion queue, fed by both the pool and the epoll thread. completions: Arc>>, - /// Pipe used as a one-bit wakeup. `wake_read` is what the scheduler - /// polls; `wake_write` is what the worker writes to. + /// Pipe the scheduler polls in its idle path. Both IO threads write to + /// `wake_write` after pushing a completion. wake_read: RawFd, wake_write: RawFd, - /// Worker thread handle, joined on shutdown. - worker: Option>, - /// Number of requests in-flight (sent but not yet drained as a - /// completion). Used by the scheduler's idle path to decide whether - /// to wait on the pipe or exit. + + // ----- Epoll machinery ----- + + /// The epollfd, owned by `IoThread`. Callable cross-thread via + /// `epoll_ctl` per the man page. + epollfd: RawFd, + /// Pipe used to signal the epoll thread to exit. Registered inside the + /// epollfd so a single `epoll_wait` covers both fd readiness and + /// shutdown. + shutdown_read: RawFd, + shutdown_write: RawFd, + /// One parked actor per registered fd. Populated by `wait_readable` / + /// `wait_writable` and drained by the scheduler when a `FdReady` + /// completion is processed. + pub waiters: HashMap, + + // ----- Threads ----- + + pool_thread: Option>, + epoll_thread: Option>, + + /// Number of `block_on_io` requests in-flight. Used by the scheduler's + /// idle path to decide whether to wait on the pipe or exit. Fd waits + /// are not counted here; they're counted by `waiters.len()`. pub outstanding: u32, } impl IoThread { pub fn start() -> io::Result { + // Scheduler-facing wake pipe. let (wake_read, wake_write) = make_pipe()?; + // Pool submission channel + shared completion queue. let (tx, rx) = mpsc::channel::(); let completions: Arc>> = Arc::new(Mutex::new(VecDeque::new())); - let comps_worker = completions.clone(); - let worker = std::thread::Builder::new() - .name("smarm-io".into()) - .spawn(move || worker_loop(rx, comps_worker, wake_write))?; + // Epoll machinery. + let epollfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) }; + if epollfd < 0 { + // Best-effort fd cleanup before bailing. + unsafe { + libc::close(wake_read); + libc::close(wake_write); + } + return Err(io::Error::last_os_error()); + } + + let (shutdown_read, shutdown_write) = match make_pipe() { + Ok(p) => p, + Err(e) => { + unsafe { + libc::close(epollfd); + libc::close(wake_read); + libc::close(wake_write); + } + return Err(e); + } + }; + + // Register the shutdown pipe in epollfd. We use a sentinel `data` + // value to recognise shutdown events. RawFd values are non-negative, + // so u64::MAX is unambiguously not a real fd-data encoding. + let mut shutdown_ev = libc::epoll_event { + events: libc::EPOLLIN as u32, + u64: SHUTDOWN_EPOLL_TOKEN, + }; + if unsafe { + libc::epoll_ctl( + epollfd, + libc::EPOLL_CTL_ADD, + shutdown_read, + &mut shutdown_ev as *mut _, + ) + } < 0 + { + let e = io::Error::last_os_error(); + unsafe { + libc::close(epollfd); + libc::close(shutdown_read); + libc::close(shutdown_write); + libc::close(wake_read); + libc::close(wake_write); + } + return Err(e); + } + + // Spawn pool thread. + let pool_comps = completions.clone(); + let pool_thread = std::thread::Builder::new() + .name("smarm-io-pool".into()) + .spawn(move || pool_loop(rx, pool_comps, wake_write))?; + + // Spawn epoll thread. + let epoll_comps = completions.clone(); + let epoll_thread = std::thread::Builder::new() + .name("smarm-io-epoll".into()) + .spawn(move || epoll_loop(epollfd, epoll_comps, wake_write))?; Ok(Self { tx, completions, wake_read, wake_write, - worker: Some(worker), + epollfd, + shutdown_read, + shutdown_write, + waiters: HashMap::new(), + pool_thread: Some(pool_thread), + epoll_thread: Some(epoll_thread), outstanding: 0, }) } - /// Hand a request to the worker. Increments `outstanding`. + /// Hand a request to the pool. Increments `outstanding`. pub fn submit(&mut self, pid: Pid, work: Box IoResult + Send>) { self.outstanding += 1; - // Send can only fail if the worker has hung up, which only happens + // Send can only fail if the pool has hung up, which only happens // on shutdown. submit during shutdown is a bug. self.tx .send(Request { pid, work }) - .expect("io worker hung up unexpectedly"); + .expect("io pool hung up unexpectedly"); } - /// Drain every available completion. Caller is responsible for - /// decrementing `outstanding` and routing the results. - pub fn drain_completions(&mut self) -> Vec<(Pid, IoResult)> { + /// Drain every available completion. Caller (the scheduler) routes the + /// results and updates `outstanding` / `waiters` accordingly. + pub fn drain_completions(&mut self) -> Vec { let mut q = self.completions.lock().unwrap(); let mut out = Vec::with_capacity(q.len()); while let Some(c) = q.pop_front() { - out.push((c.pid, c.result)); + out.push(c); } out } @@ -123,78 +254,222 @@ impl IoThread { pub fn wake_fd(&self) -> RawFd { self.wake_read } + + /// Register interest in `fd` becoming readable/writable; record `pid` + /// as the parked waiter. The epoll thread will push a `FdReady` + /// completion when the kernel signals. + /// + /// EPOLLONESHOT: one wakeup per registration. The scheduler must + /// `epoll_del` on completion to free the slot for re-registration. + pub fn epoll_register( + &mut self, + fd: RawFd, + pid: Pid, + readable: bool, + writable: bool, + ) -> io::Result<()> { + // Two actors waiting on the same fd would be a misuse: the kernel + // delivers exactly one EPOLLONESHOT wakeup, so the second waiter + // would hang. Reject up front. + if self.waiters.contains_key(&fd) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "fd already has a parked waiter", + )); + } + + // Defensive cleanup: if a previous actor died while waiting on this + // fd, the kernel-side registration was leaked (we don't walk all + // waiters on actor death). A bare DEL is harmless if the fd isn't + // registered (ENOENT), and removes any leak. + unsafe { + libc::epoll_ctl(self.epollfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut()); + } + + let mut events: u32 = libc::EPOLLONESHOT as u32; + if readable { + events |= libc::EPOLLIN as u32; + } + if writable { + events |= libc::EPOLLOUT as u32; + } + let mut ev = libc::epoll_event { + events, + u64: fd as u64, + }; + let r = unsafe { + libc::epoll_ctl(self.epollfd, libc::EPOLL_CTL_ADD, fd, &mut ev as *mut _) + }; + if r < 0 { + return Err(io::Error::last_os_error()); + } + self.waiters.insert(fd, pid); + Ok(()) + } + + /// Remove `fd` from the epollfd. Called by the scheduler after a + /// `FdReady` completion, so the next `wait_readable(fd)` can ADD again. + /// + /// Does NOT touch `waiters` — that's the scheduler's bookkeeping; this + /// is purely the kernel-side cleanup. + pub fn epoll_deregister(&mut self, fd: RawFd) { + // EPOLL_CTL_DEL of an already-removed fd returns ENOENT; ignore. + unsafe { + libc::epoll_ctl(self.epollfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut()); + } + } } impl Drop for IoThread { fn drop(&mut self) { - // Hang up the request channel; the worker will exit its loop. - // We must drop `tx` before joining. Take it out by moving. - // mpsc::Sender doesn't have explicit `disconnect`; dropping it - // (after this scope) causes the receiver to return Err. - // - // Trick: replace self.tx with a fresh dead one so we can drop it. + // 1. Signal the epoll thread to exit by writing the shutdown pipe. + unsafe { + let buf: [u8; 1] = [0]; + // Single byte; we don't care about EINTR retry here — worst + // case the epoll thread blocks until process exit, which is + // fine because we then close fds out from under it. + libc::write(self.shutdown_write, buf.as_ptr() as *const _, 1); + } + + // 2. Hang up the pool's request channel so the pool thread exits. let (dead_tx, _) = mpsc::channel::(); let real_tx = std::mem::replace(&mut self.tx, dead_tx); drop(real_tx); - if let Some(h) = self.worker.take() { - // Best-effort join. If the worker panicked, ignore. + // 3. Join both threads. + if let Some(h) = self.epoll_thread.take() { + let _ = h.join(); + } + if let Some(h) = self.pool_thread.take() { let _ = h.join(); } - // Close the pipe. + // 4. Close fds. unsafe { + libc::close(self.epollfd); + libc::close(self.shutdown_read); + libc::close(self.shutdown_write); libc::close(self.wake_read); libc::close(self.wake_write); } } } +/// Sentinel `epoll_event.u64` distinguishing the shutdown pipe from +/// registered actor fds. RawFd values fit in i32, so the high bits are +/// available for a marker; we use u64::MAX which can't be a valid fd. +const SHUTDOWN_EPOLL_TOKEN: u64 = u64::MAX; + // --------------------------------------------------------------------------- -// Worker loop +// Pool loop // --------------------------------------------------------------------------- -fn worker_loop( +fn pool_loop( rx: mpsc::Receiver, completions: Arc>>, wake_write: RawFd, ) { while let Ok(Request { pid, work }) = rx.recv() { let result: IoResult = match panic::catch_unwind(panic::AssertUnwindSafe(work)) { - Ok(r) => r, + Ok(r) => r, Err(payload) => Err(payload), }; - completions.lock().unwrap().push_back(Completion { pid, result }); - // Write one byte to the pipe to wake the scheduler. If the pipe - // buffer is full (scheduler isn't draining), the write may return - // EAGAIN — we'll ignore it because there's already an outstanding - // wakeup that hasn't been consumed yet. - let buf: [u8; 1] = [0]; - unsafe { - // EINTR is the only retryable case worth handling. - loop { - let n = libc::write(wake_write, buf.as_ptr() as *const _, 1); - if n < 0 { - let e = *libc::__errno_location(); - if e == libc::EINTR { continue; } - } - break; + completions + .lock() + .unwrap() + .push_back(Completion::Blocking { pid, result }); + wake_scheduler(wake_write); + } +} + +// --------------------------------------------------------------------------- +// Epoll loop +// --------------------------------------------------------------------------- + +fn epoll_loop( + epollfd: RawFd, + completions: Arc>>, + wake_write: RawFd, +) { + // Buffer for epoll_wait. 64 is plenty for our scale; if a real load + // appears that needs more, this is a one-line change. + const MAX_EVENTS: usize = 64; + let mut events: [libc::epoll_event; MAX_EVENTS] = unsafe { std::mem::zeroed() }; + + loop { + let n = unsafe { + libc::epoll_wait( + epollfd, + events.as_mut_ptr(), + MAX_EVENTS as libc::c_int, + -1, + ) + }; + + if n < 0 { + let e = unsafe { *libc::__errno_location() }; + if e == libc::EINTR { + continue; } + // Anything else here is a programming error (EBADF on epollfd + // after we've closed it from Drop — the close races with us). + // Treat as shutdown. + return; + } + + let mut shutdown_requested = false; + let mut pushed_any = false; + { + let mut q = completions.lock().unwrap(); + for ev in events.iter().take(n as usize) { + if ev.u64 == SHUTDOWN_EPOLL_TOKEN { + shutdown_requested = true; + continue; + } + let fd = ev.u64 as RawFd; + q.push_back(Completion::FdReady { + fd, + events: ev.events, + }); + pushed_any = true; + } + } + + if pushed_any { + wake_scheduler(wake_write); + } + if shutdown_requested { + return; + } + } +} + +/// Write one byte to the scheduler's wake pipe. Retries on EINTR; ignores +/// EAGAIN (pipe full means there's already an outstanding wake we haven't +/// consumed yet, which is sufficient). +fn wake_scheduler(wake_write: RawFd) { + let buf: [u8; 1] = [0]; + unsafe { + loop { + let n = libc::write(wake_write, buf.as_ptr() as *const _, 1); + if n < 0 { + let e = *libc::__errno_location(); + if e == libc::EINTR { + continue; + } + } + break; } } } // --------------------------------------------------------------------------- -// Pipe helpers +// Pipe helpers (unchanged from v0.2) // --------------------------------------------------------------------------- fn make_pipe() -> io::Result<(RawFd, RawFd)> { let mut fds: [libc::c_int; 2] = [0; 2]; - // O_CLOEXEC so children don't inherit, O_NONBLOCK on the read side - // so the scheduler's drain can `read` without blocking. - let r = unsafe { - libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) - }; + let r = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) }; if r != 0 { return Err(io::Error::last_os_error()); } @@ -208,7 +483,6 @@ pub fn drain_wake_pipe(fd: RawFd) { loop { let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) }; if n <= 0 { - // EAGAIN (would block) or EOF — done. break; } } @@ -220,17 +494,26 @@ pub fn poll_wake(fd: RawFd, timeout: Option) { let timeout_ms: libc::c_int = match timeout { None => -1, Some(d) => { - // Cap at i32::MAX milliseconds; poll's argument is c_int. let ms = d.as_millis(); - if ms > i32::MAX as u128 { i32::MAX } else { ms as i32 } + if ms > i32::MAX as u128 { + i32::MAX + } else { + ms as i32 + } } }; - let mut pfd = libc::pollfd { fd, events: libc::POLLIN, revents: 0 }; + let mut pfd = libc::pollfd { + fd, + events: libc::POLLIN, + revents: 0, + }; loop { let r = unsafe { libc::poll(&mut pfd as *mut _, 1, timeout_ms) }; if r < 0 { let e = unsafe { *libc::__errno_location() }; - if e == libc::EINTR { continue; } + if e == libc::EINTR { + continue; + } } break; } diff --git a/src/lib.rs b/src/lib.rs index d7a93ba..3e6e348 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,11 +2,14 @@ //! //! Erlang-style green-thread actor concurrency for Rust. //! -//! v0.1 is single-threaded. One scheduler, one OS thread. The scheduler +//! Single-threaded for now: one scheduler, one OS thread. The scheduler //! cooperatively interleaves green-thread actors with hand-rolled context //! switches. Actors communicate by sending `Send` messages over channels; //! every actor has a supervisor, which is itself just an actor with a -//! `Receiver`. +//! `Receiver`. Synchronisation primitives — `Mutex` with +//! mandatory lock timeouts, channel `recv`, `sleep`, and epoll-backed +//! `wait_readable`/`wait_writable` — all park the green thread, never +//! the OS thread. //! //! See `LOOM.md` for the design intent and the deferred-for-later list. @@ -20,6 +23,7 @@ pub mod scheduler; pub mod supervisor; pub mod timer; pub mod io; +pub mod mutex; // --------------------------------------------------------------------------- // Global allocator @@ -37,10 +41,16 @@ static ALLOCATOR: preempt::PreemptingAllocator = preempt::PreemptingAllocator; // --------------------------------------------------------------------------- pub use channel::{channel, Receiver, RecvError, Sender}; +pub use mutex::{LockTimeout, Mutex, MutexGuard}; pub use pid::Pid; pub use scheduler::{ - block_on_io, run, self_pid, sleep, spawn, spawn_under, yield_now, JoinError, JoinHandle, + block_on_io, run, self_pid, sleep, spawn, spawn_under, wait_readable, wait_writable, + yield_now, JoinError, JoinHandle, }; +// `read` and `write` would shadow heavily-used names if re-exported at the +// crate root; users reach for them as `smarm::scheduler::read` / +// `smarm::scheduler::write` instead. May reshuffle into a `smarm::io` +// surface in a future pass. pub use supervisor::Signal; // --------------------------------------------------------------------------- diff --git a/src/mutex.rs b/src/mutex.rs new file mode 100644 index 0000000..d9ac739 --- /dev/null +++ b/src/mutex.rs @@ -0,0 +1,318 @@ +//! Actor-aware mutex with mandatory timeout. +//! +//! `loom::Mutex` looks like `std::sync::Mutex` but its `lock()` parks +//! the calling *green* thread on contention rather than blocking the OS +//! thread — and every lock attempt is bounded by a timeout. If the lock is +//! not acquired within the timeout, `lock()` returns `Err(LockTimeout)`. +//! This is a hard runtime guarantee (the spec calls it out): no actor can +//! be parked on a mutex forever. +//! +//! ```ignore +//! let m = loom::Mutex::new(42); +//! let guard = m.lock()?; // default timeout +//! let guard = m.lock_timeout(Duration::from_millis(50))?; +//! ``` +//! +//! Fairness +//! ======== +//! Waiters are granted the lock in FIFO order. The spec prizes fairness: +//! starvation under contention is precisely the kind of failure mode +//! supervision can't recover from cleanly. LIFO would be faster on cache +//! locality and is not offered. +//! +//! Poisoning +//! ========= +//! Unlike `std::sync::Mutex`, `loom::Mutex` does not poison on panic. If a +//! holder panics while holding the lock, the next waiter receives the +//! (now-untouched) value. Rationale: supervision handles the panic at the +//! actor level; a separate poisoning channel is redundant and adds an +//! error case to every `lock()`. Users who care about "the value may be in +//! an inconsistent state after a panic" should encode that in `T` itself +//! (e.g. `Mutex>` and `take()` the value at the start of +//! each critical section). +//! +//! Reentrance +//! ========== +//! Not reentrant. An actor that already holds the lock and calls `lock()` +//! again on the same mutex will wait on its own grant — and time out. This +//! is a bug in the caller, not a feature. +//! +//! Multi-threading note +//! ==================== +//! The current implementation uses `Rc>` internals because the +//! v0.2 scheduler is single-threaded. The public API is identical to what +//! the eventual multi-threaded version will expose; the migration replaces +//! the `Rc` with `Arc` around bookkeeping (waiters +//! queue, holder pid) — the *value* itself never goes through a blocking +//! OS-level lock, because contention always parks the green thread first. +//! No `unsafe impl Send` games today: `loom::Mutex` is `!Send` on v0.2, +//! which is correct given there is only one OS thread. + +use crate::pid::Pid; +use crate::scheduler; +use crate::timer::{self, TimerTarget}; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::rc::Rc; +use std::time::Duration; + +/// 30 seconds. Override per-call with `lock_timeout`, or per-mutex (TODO) +/// once the supervisor-level policy hook lands. +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub struct LockTimeout; + +impl std::fmt::Display for LockTimeout { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "mutex lock timed out") + } +} +impl std::error::Error for LockTimeout {} + +// --------------------------------------------------------------------------- +// Internals +// --------------------------------------------------------------------------- + +/// A pending lock attempt. Sits in `MutexCore::state.waiters` from the +/// moment an actor parks until it is either granted the lock (popped by +/// `MutexGuard::drop`) or times out (popped by `on_timeout`). +struct Wait { + pid: Pid, + /// Per-mutex monotonic sequence. Lets `on_timeout` recognise "this + /// specific wait" vs. "a later wait by the same pid on the same + /// mutex" — important because a single actor can re-acquire and then + /// re-wait, and we don't want a stale timer firing to disturb the new + /// wait. + seq: u64, +} + +/// The non-generic part of the mutex. Lives inside `Rc<>` so it can also +/// be stashed (as `Rc`) inside a timer entry. +struct MutexCore { + state: RefCell, + default_timeout: Cell, +} + +struct MutexState { + holder: Option, + waiters: VecDeque, + next_seq: u64, +} + +impl MutexCore { + fn new(default_timeout: Duration) -> Self { + Self { + state: RefCell::new(MutexState { + holder: None, + waiters: VecDeque::new(), + next_seq: 0, + }), + default_timeout: Cell::new(default_timeout), + } + } +} + +impl TimerTarget for MutexCore { + fn on_timeout(&self, pid: Pid, wait_seq: u64) { + // Remove the waiter with this seq, if it's still queued. If it's + // gone the lock was already granted to this actor before the timer + // popped — the actor will return normally; do nothing. + let removed = { + let mut st = self.state.borrow_mut(); + if let Some(pos) = st.waiters.iter().position(|w| w.seq == wait_seq) { + st.waiters.remove(pos); + true + } else { + false + } + }; + if removed { + // The actor is parked, waiting on us. Wake it up; `lock_timeout` + // will resume, observe `holder != Some(self)`, and return + // LockTimeout. + scheduler::unpark(pid); + } + } +} + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +pub struct Mutex { + core: Rc, + /// `None` while the lock is held; `Some(T)` while free or while a + /// grantee is in the gap between unpark and resumption. + value: Rc>>, +} + +impl Mutex { + pub fn new(value: T) -> Self { + Self { + core: Rc::new(MutexCore::new(DEFAULT_TIMEOUT)), + value: Rc::new(RefCell::new(Some(value))), + } + } + + /// Set the default timeout used by `lock()`. Does not affect in-flight + /// `lock_timeout` calls. + pub fn set_default_timeout(&self, timeout: Duration) { + self.core.default_timeout.set(timeout); + } + + /// Acquire the lock, blocking the calling actor until it's granted or + /// the default timeout expires. + pub fn lock(&self) -> Result, LockTimeout> { + self.lock_timeout(self.core.default_timeout.get()) + } + + /// Acquire the lock with an explicit timeout. + pub fn lock_timeout(&self, timeout: Duration) -> Result, LockTimeout> { + let me = scheduler::self_pid(); + + // Fast path: nobody holds it. Mark ourselves as holder, take the + // value out, return a guard. + { + let mut st = self.core.state.borrow_mut(); + if st.holder.is_none() { + st.holder = Some(me); + drop(st); + let value = self + .value + .borrow_mut() + .take() + .expect("Mutex: value missing on free fast path"); + return Ok(MutexGuard { + mutex: self, + value: Some(value), + }); + } + } + + // Slow path: register as a waiter, schedule a timeout, park. + // No preemption during prep-to-park — see scheduler::NoPreempt. + let _np = scheduler::NoPreempt::enter(); + let seq = { + let mut st = self.core.state.borrow_mut(); + let seq = st.next_seq; + st.next_seq = st.next_seq.wrapping_add(1); + st.waiters.push_back(Wait { pid: me, seq }); + seq + }; + + let target: Rc = self.core.clone(); + let deadline = timer::deadline_from_now(timeout); + scheduler::insert_wait_timer(deadline, me, target, seq); + scheduler::park_current(); + + // Resumed. Two possibilities: + // (a) MutexGuard::drop on the previous holder popped us off the + // waiters queue, set core.holder = me, and unparked us. + // => self.value is Some, we take it and return Ok. + // (b) on_timeout fired: it removed us from waiters and unparked + // us, but did NOT set holder. core.holder is whatever it was + // (Some(other) or None). => return Err. + let is_holder = self.core.state.borrow().holder == Some(me); + if is_holder { + let value = self + .value + .borrow_mut() + .take() + .expect("Mutex: value missing after grant"); + Ok(MutexGuard { + mutex: self, + value: Some(value), + }) + } else { + Err(LockTimeout) + } + } + + /// Non-blocking attempt. Returns `Some` if the lock was free, `None` + /// otherwise. Useful as a fast path before a long-running computation, + /// or for tests. + pub fn try_lock(&self) -> Option> { + let mut st = self.core.state.borrow_mut(); + if st.holder.is_some() { + return None; + } + let me = scheduler::self_pid(); + st.holder = Some(me); + drop(st); + let value = self + .value + .borrow_mut() + .take() + .expect("Mutex: value missing on try_lock free path"); + Some(MutexGuard { + mutex: self, + value: Some(value), + }) + } +} + +impl Clone for Mutex { + /// Cloning a `Mutex` clones the handle, not the protected value — + /// both clones refer to the same lock state and the same `T`. + fn clone(&self) -> Self { + Self { + core: self.core.clone(), + value: self.value.clone(), + } + } +} + +// --------------------------------------------------------------------------- +// Guard +// --------------------------------------------------------------------------- + +pub struct MutexGuard<'a, T> { + mutex: &'a Mutex, + /// The protected value, taken out of `mutex.value` while the guard is + /// alive. `Option` only so `Drop` can put it back; in normal use this + /// is always `Some` while the guard is observable. + value: Option, +} + +impl std::ops::Deref for MutexGuard<'_, T> { + type Target = T; + fn deref(&self) -> &T { + self.value.as_ref().expect("MutexGuard: value missing") + } +} + +impl std::ops::DerefMut for MutexGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + self.value.as_mut().expect("MutexGuard: value missing") + } +} + +impl Drop for MutexGuard<'_, T> { + fn drop(&mut self) { + // Put the value back into the mutex. + let v = self.value.take().expect("MutexGuard: double drop"); + *self.mutex.value.borrow_mut() = Some(v); + + // Pick the next waiter (if any) and grant it the lock by writing + // its pid into `holder` *before* unparking. The grantee, on + // resumption, will see `holder == self_pid` and take the value. + let next_pid = { + let mut st = self.mutex.core.state.borrow_mut(); + let next = st.waiters.pop_front(); + match next { + Some(w) => { + st.holder = Some(w.pid); + Some(w.pid) + } + None => { + st.holder = None; + None + } + } + }; + if let Some(pid) = next_pid { + scheduler::unpark(pid); + } + } +} diff --git a/src/scheduler.rs b/src/scheduler.rs index 3bd0213..1ab238b 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -344,16 +344,69 @@ pub fn park_current() { unsafe { crate::context::switch_to_scheduler() }; } +/// RAII guard that disables allocator-driven preemption for its lifetime. +/// +/// The "prep-to-park" hazard described in `preempt.rs`: a primitive that +/// (a) registers an unparker (channel waiter slot, fd waiter map, mutex +/// waiter queue, …) and then (b) calls `park_current()` must not yield +/// between (a) and (b). If it does, an early unpark fires while the actor +/// is still Runnable, the unpark no-ops, and then the actor parks with no +/// one to wake it. +/// +/// Library code wraps the prep + park in `let _g = NoPreempt::enter();` +/// and the guard is held until just after `park_current` returns (or +/// dropped earlier, immediately before `park_current`, since `park_current` +/// itself returns control to the scheduler which disables preemption on +/// its own path). +pub struct NoPreempt(bool); + +impl NoPreempt { + pub fn enter() -> Self { + let prev = crate::preempt::PREEMPTION_ENABLED.with(|c| c.replace(false)); + NoPreempt(prev) + } +} + +impl Drop for NoPreempt { + fn drop(&mut self) { + crate::preempt::PREEMPTION_ENABLED.with(|c| c.set(self.0)); + } +} + /// Park the current actor for at least `duration`. A zero duration behaves /// like `yield_now` (the deadline is immediately in the past, so the timer /// pops on the next scheduler iteration). pub fn sleep(duration: std::time::Duration) { let me = current_pid().expect("sleep() called outside an actor"); + let _np = NoPreempt::enter(); let deadline = crate::timer::deadline_from_now(duration); - with_sched(|s| s.timers.insert(deadline, me)); + with_sched(|s| s.timers.insert_sleep(deadline, me)); park_current(); } +/// Insert a `WaitTimeout` timer entry. Library code (`Mutex::lock_timeout` +/// today, future bounded-wait primitives) calls this just before +/// `park_current()` so that if the wait isn't satisfied by `deadline`, +/// `target.on_timeout(pid, wait_seq)` will fire. +/// +/// Cancellation: not needed. If the wait is satisfied early, the entry is +/// still in the heap and will pop in due course; `on_timeout` is expected +/// to be idempotent on stale-seq. +pub fn insert_wait_timer( + deadline: std::time::Instant, + pid: Pid, + target: std::rc::Rc, + wait_seq: u64, +) { + with_sched(|s| { + s.timers.insert( + deadline, + pid, + crate::timer::Reason::WaitTimeout { target, wait_seq }, + ); + }); +} + /// Run `f` on the IO worker thread, park the current actor while it runs, /// and return `f`'s value when it completes. Panics inside `f` propagate /// to the calling actor. @@ -377,12 +430,14 @@ where Ok(Box::new(v) as Box) }); - with_sched(|s| { - let io = s.io.as_mut().expect("io thread not started"); - io.submit(me, work); - }); - - park_current(); + { + let _np = NoPreempt::enter(); + with_sched(|s| { + let io = s.io.as_mut().expect("io thread not started"); + io.submit(me, work); + }); + park_current(); + } // On resume, our slot has a result waiting. let result = with_sched(|s| { @@ -401,6 +456,83 @@ where } } +// --------------------------------------------------------------------------- +// Fd-readiness primitives. +// +// `wait_readable(fd)` / `wait_writable(fd)` register interest with the +// epoll thread, park the calling actor, and return when the kernel +// signals readiness. The subsequent syscall (`read`/`write`) is done on +// the actor's own thread by the caller — no buffer crosses an actor +// boundary. +// +// Fds passed in should be O_NONBLOCK; see io.rs module docs. +// --------------------------------------------------------------------------- + +/// Park the calling actor until `fd` is readable. +pub fn wait_readable(fd: std::os::fd::RawFd) -> std::io::Result<()> { + wait_fd(fd, /*readable=*/ true, /*writable=*/ false) +} + +/// Park the calling actor until `fd` is writable. +pub fn wait_writable(fd: std::os::fd::RawFd) -> std::io::Result<()> { + wait_fd(fd, /*readable=*/ false, /*writable=*/ true) +} + +fn wait_fd(fd: std::os::fd::RawFd, readable: bool, writable: bool) -> std::io::Result<()> { + let me = current_pid().expect("wait_*() called outside an actor"); + + // Register with the epoll thread. If registration fails (bad fd, + // already-parked waiter, OOM in the kernel), return the error + // without parking — the actor never went to sleep. + let _np = NoPreempt::enter(); + with_sched(|s| { + let io = s.io.as_mut().expect("io thread not started"); + io.epoll_register(fd, me, readable, writable) + })?; + + park_current(); + // On resume, the scheduler has already removed `fd` from `waiters` + // and DEL'd it from epollfd. There is no per-call return value; + // success here just means "fd is ready, go do your syscall". + // + // Note: there is no error path on resume because v0.2 doesn't time + // out fd waits and doesn't otherwise spurious-wake. If those are + // added, this function grows a non-trivial return. + Ok(()) +} + +/// Wait until `fd` is readable, then run a single `read(2)`. Returns the +/// number of bytes read, or an `io::Error` from the syscall. +/// +/// `fd` should be opened `O_NONBLOCK`. With a blocking fd, the kernel's +/// readiness signal does not guarantee a non-blocking read — a signal +/// could interrupt, and the actor's syscall would then stall the +/// scheduler thread. +pub fn read(fd: std::os::fd::RawFd, buf: &mut [u8]) -> std::io::Result { + wait_readable(fd)?; + let n = unsafe { + libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) + }; + if n < 0 { + Err(std::io::Error::last_os_error()) + } else { + Ok(n as usize) + } +} + +/// Wait until `fd` is writable, then run a single `write(2)`. +pub fn write(fd: std::os::fd::RawFd, buf: &[u8]) -> std::io::Result { + wait_writable(fd)?; + let n = unsafe { + libc::write(fd, buf.as_ptr() as *const _, buf.len()) + }; + if n < 0 { + Err(std::io::Error::last_os_error()) + } else { + Ok(n as usize) + } +} + /// Wake a parked actor. If the actor isn't parked (already runnable or done) /// this is a no-op — that's important; channel and join can both fire /// spurious unparks under some orderings and we want them to be cheap. @@ -488,41 +620,95 @@ pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX); fn schedule_loop() { loop { - // 1. Drain due timers into the run queue. + // 1. Drain due timers and dispatch by reason. + // + // Sleep — unpark the actor (idempotently: only if still + // parked). + // WaitTimeout — call the target's on_timeout. The target decides + // whether the wait was still in progress (timer + // won the race) or had been fulfilled (the thing + // the actor was waiting for arrived first → no-op). + // The target is responsible for calling unpark() + // if appropriate. let now = std::time::Instant::now(); let due = with_sched(|s| s.timers.pop_due(now)); - for pid in due { - // Same idempotency as `unpark`: only re-queue if still parked. - with_sched(|s| { - if let Some(slot) = s.slot_mut(pid) { - if matches!(slot.state, State::Parked) { - slot.state = State::Runnable; - s.run_queue.push_back(pid); - } + for entry in due { + match entry.reason { + crate::timer::Reason::Sleep => { + with_sched(|s| { + if let Some(slot) = s.slot_mut(entry.pid) { + if matches!(slot.state, State::Parked) { + slot.state = State::Runnable; + s.run_queue.push_back(entry.pid); + } + } + }); } - }); + crate::timer::Reason::WaitTimeout { target, wait_seq } => { + // Note: the target callback runs *outside* with_sched. + // It may call back into the scheduler (e.g. unpark), so + // we must not hold the SCHED borrow across it. + target.on_timeout(entry.pid, wait_seq); + } + } } - // 2. Drain IO completions: route each result to its slot and - // unpark the actor. Drain even when we have other runnables — - // it's cheap (a try_lock of the completion queue) and keeps - // pending_io_result freshness bounded. + // 2. Drain IO completions: route each result by variant. + // + // Blocking — a `block_on_io` closure finished. Stash the result + // on the actor's slot and unpark. + // FdReady — an fd registered via `wait_readable`/`wait_writable` + // is ready. Look up the parked pid in the io thread's + // waiters map, deregister the fd, unpark. + // + // Drain even when we have other runnables — it's cheap and keeps + // `pending_io_result` / `waiters` freshness bounded. let completions = with_sched(|s| { s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default() }); - for (pid, result) in completions { - with_sched(|s| { - if let Some(io) = s.io.as_mut() { - io.outstanding = io.outstanding.saturating_sub(1); + for completion in completions { + match completion { + crate::io::Completion::Blocking { pid, result } => { + with_sched(|s| { + if let Some(io) = s.io.as_mut() { + io.outstanding = io.outstanding.saturating_sub(1); + } + if let Some(slot) = s.slot_mut(pid) { + slot.pending_io_result = Some(result); + if matches!(slot.state, State::Parked) { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + } + } + }); } - if let Some(slot) = s.slot_mut(pid) { - slot.pending_io_result = Some(result); - if matches!(slot.state, State::Parked) { - slot.state = State::Runnable; - s.run_queue.push_back(pid); - } + crate::io::Completion::FdReady { fd, events: _ } => { + with_sched(|s| { + let parked_pid = s.io.as_mut() + .and_then(|io| { + let pid = io.waiters.remove(&fd); + // Deregister the fd from epollfd; the + // EPOLLONESHOT already disarmed it but the + // slot is still occupied until we DEL. + io.epoll_deregister(fd); + pid + }); + if let Some(pid) = parked_pid { + if let Some(slot) = s.slot_mut(pid) { + if matches!(slot.state, State::Parked) { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + } + } + // else: actor died between registering and the + // fd firing. Nothing to do; the registration + // has been cleaned up. + } + // else: fd not in waiters — probably a duplicate + // FdReady from a previous registration, ignore. + }); } - }); + } } // 3. Pop a runnable actor. If none, decide whether to block on @@ -536,11 +722,19 @@ fn schedule_loop() { // trying to take the completions mutex, which is fine, // but the scheduler thread itself mustn't hold SCHED // borrowed across a blocking syscall. + // + // "Outstanding" here means *anything* the IO thread is + // expected to deliver a wakeup for: in-flight blocking + // calls AND parked fd waiters. If either is non-zero we + // must wait for the IO thread, not exit. let (next_deadline, io_outstanding, wake_fd) = with_sched(|s| { let next = s.timers.peek_deadline(); let (out, fd) = match s.io.as_ref() { - Some(io) => (io.outstanding, Some(io.wake_fd())), - None => (0, None), + Some(io) => ( + io.outstanding + io.waiters.len() as u32, + Some(io.wake_fd()), + ), + None => (0, None), }; (next, out, fd) }); diff --git a/src/timer.rs b/src/timer.rs index e12b3c1..025fa42 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -1,38 +1,86 @@ -//! Sleep timers. +//! Sleep + wait-with-timeout timers. //! -//! A min-heap of `(deadline, Pid)` entries lives on `SchedulerState`. When -//! an actor calls `sleep`, the runtime inserts the entry, marks the actor -//! parked, and yields. On every scheduler loop iteration the runtime pops -//! all entries whose deadline has passed and unparks them. When the run -//! queue is empty but the heap is not, the runtime sleeps the OS thread -//! until the soonest deadline, then re-checks. +//! A min-heap of `(deadline, seq, reason)` entries lives on `SchedulerState`. +//! When an actor sleeps or starts a bounded wait (e.g. `mutex.lock()` with a +//! timeout), the runtime inserts an entry, marks the actor parked, and yields. +//! On every scheduler loop iteration the runtime pops all entries whose +//! deadline has passed and dispatches each according to its `Reason`: //! -//! `BinaryHeap` is a max-heap, so entries are stored with their deadline -//! wrapped in `Reverse` to get min-heap behaviour. +//! - `Sleep`: unpark the actor. +//! - `WaitTimeout`: call `on_timeout` on the registered target. The target +//! (e.g. a `Mutex`) decides whether the actor was actually still waiting +//! (timer fires first → unpark with error) or had already been granted +//! what it was waiting for (lock granted first → no-op). //! -//! Stale pids (slot reused since the timer was inserted) are detected on -//! `due_pids` pop and silently dropped — same convention as the run queue. +//! `BinaryHeap` is a max-heap; entries are wrapped in `Reverse` to get +//! min-heap behaviour. +//! +//! No cancellation. When a non-timer wakeup happens (e.g. lock granted +//! before timeout), the timer entry is left in the heap. It will be popped +//! eventually and the dispatch will observe "actor is no longer parked / +//! wait_seq is stale" and no-op. Cost is ~32 bytes per stale entry plus a +//! few cycles on pop; acceptable given the upper bound is "one entry per +//! parked actor". +//! +//! Stale pids (slot reused since the timer was inserted) are filtered on +//! pop by the scheduler — same convention as the run queue. use crate::pid::Pid; use std::cmp::Reverse; use std::collections::BinaryHeap; +use std::rc::Rc; use std::time::{Duration, Instant}; -#[derive(PartialEq, Eq)] +/// What to do when a timer entry's deadline arrives. +/// +/// Held inside `Entry`, dispatched by the scheduler in `pop_due`. +pub enum Reason { + /// `loom::sleep(d)`. Unpark `pid` unconditionally (modulo the usual + /// "still parked?" check the scheduler applies). + Sleep, + /// A bounded wait — currently only `Mutex::lock_timeout`. On expiry the + /// scheduler calls `target.on_timeout(pid, wait_seq)`. The target then + /// decides whether `pid` was actually still waiting, and if so unparks + /// it with whatever error the wait was bounded for. `wait_seq` lets the + /// target tell apart "this wait" from "a later wait by the same actor + /// on the same target". + WaitTimeout { + target: Rc, + wait_seq: u64, + }, +} + +/// Callback the scheduler invokes when a `WaitTimeout` entry pops. +/// +/// Implementors: do not touch `SchedulerState` other than via the public +/// `unpark` / channel APIs. The scheduler is mid-iteration when this fires. +pub trait TimerTarget { + fn on_timeout(&self, pid: Pid, wait_seq: u64); +} + pub struct Entry { pub deadline: Instant, + /// Insertion order, used purely as a tiebreaker so `Entry: Ord` works + /// without having to compare the `Reason` payload (which contains an + /// `Rc` and isn't `Ord`). + seq: u64, pub pid: Pid, + pub reason: Reason, } +impl PartialEq for Entry { + fn eq(&self, other: &Self) -> bool { + self.deadline == other.deadline && self.seq == other.seq + } +} +impl Eq for Entry {} + impl Ord for Entry { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // Only `deadline` matters for ordering; pid is a tiebreaker so the - // type is Ord, but the order among same-deadline entries is - // irrelevant. - self.deadline - .cmp(&other.deadline) - .then_with(|| self.pid.index().cmp(&other.pid.index())) - .then_with(|| self.pid.generation().cmp(&other.pid.generation())) + // Earlier deadline first; ties broken by insertion order so the + // ordering is total. `Reason` and `Pid` deliberately don't + // participate. + self.deadline.cmp(&other.deadline).then_with(|| self.seq.cmp(&other.seq)) } } @@ -46,15 +94,25 @@ impl PartialOrd for Entry { pub struct Timers { /// Reverse-wrapped so the smallest deadline is at the top. heap: BinaryHeap>, + /// Monotonic counter for the tiebreaker `seq` field. + next_seq: u64, } impl Timers { pub fn new() -> Self { - Self { heap: BinaryHeap::new() } + Self { heap: BinaryHeap::new(), next_seq: 0 } } - pub fn insert(&mut self, deadline: Instant, pid: Pid) { - self.heap.push(Reverse(Entry { deadline, pid })); + /// Insert a `Sleep` timer. Convenience for the common case. + pub fn insert_sleep(&mut self, deadline: Instant, pid: Pid) { + self.insert(deadline, pid, Reason::Sleep); + } + + /// Insert an arbitrary timer entry. + pub fn insert(&mut self, deadline: Instant, pid: Pid, reason: Reason) { + let seq = self.next_seq; + self.next_seq = self.next_seq.wrapping_add(1); + self.heap.push(Reverse(Entry { deadline, seq, pid, reason })); } pub fn is_empty(&self) -> bool { @@ -66,13 +124,13 @@ impl Timers { self.heap.peek().map(|r| r.0.deadline) } - /// Pop and return every pid whose deadline is ≤ `now`. - pub fn pop_due(&mut self, now: Instant) -> Vec { + /// Pop every entry whose deadline is ≤ `now`, in deadline order. + /// The scheduler dispatches each entry by inspecting `entry.reason`. + pub fn pop_due(&mut self, now: Instant) -> Vec { let mut out = Vec::new(); while let Some(r) = self.heap.peek() { if r.0.deadline <= now { - let e = self.heap.pop().unwrap().0; - out.push(e.pid); + out.push(self.heap.pop().unwrap().0); } else { break; } @@ -81,7 +139,7 @@ impl Timers { } } -/// Wall-clock duration helper exposed for `sleep`. +/// Wall-clock duration helper exposed for `sleep` and `lock_timeout`. pub fn deadline_from_now(duration: Duration) -> Instant { Instant::now() .checked_add(duration) diff --git a/tests/io_epoll.rs b/tests/io_epoll.rs new file mode 100644 index 0000000..a28516a --- /dev/null +++ b/tests/io_epoll.rs @@ -0,0 +1,324 @@ +//! Tests for epoll-based fd readiness primitives: `wait_readable`, +//! `wait_writable`, and the `read`/`write` sugar on top of them. +//! +//! Pipes are the convenient test target: cheap to create, easy to drive, +//! and we already use `libc::pipe2` internally. Each pipe is one direction +//! and respects `O_NONBLOCK` if we ask for it. + +use smarm::{run, spawn, wait_readable, wait_writable, yield_now}; +use std::os::fd::RawFd; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::sync::Mutex as StdMutex; +use std::time::Duration; + +// --------------------------------------------------------------------------- +// Pipe helper +// --------------------------------------------------------------------------- + +struct Pipe { + read: RawFd, + write: RawFd, +} + +impl Pipe { + fn new() -> Self { + let mut fds: [libc::c_int; 2] = [0; 2]; + let r = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) }; + assert_eq!(r, 0, "pipe2 failed"); + Pipe { + read: fds[0], + write: fds[1], + } + } +} + +impl Drop for Pipe { + fn drop(&mut self) { + unsafe { + libc::close(self.read); + libc::close(self.write); + } + } +} + +fn raw_write(fd: RawFd, buf: &[u8]) -> isize { + unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) } +} + +fn raw_read(fd: RawFd, buf: &mut [u8]) -> isize { + unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) } +} + +// --------------------------------------------------------------------------- +// wait_readable parks until data arrives, then libc::read succeeds. +// --------------------------------------------------------------------------- + +#[test] +fn wait_readable_blocks_until_data_arrives_then_read_succeeds() { + let captured: Arc>> = Arc::new(StdMutex::new(Vec::new())); + let cap = captured.clone(); + + let p = Arc::new(Pipe::new()); + let p_reader = p.clone(); + let p_writer = p.clone(); + + run(move || { + let reader = spawn(move || { + // Initially the pipe is empty; this parks. + wait_readable(p_reader.read).expect("wait_readable failed"); + // Now data should be readable. + let mut buf = [0u8; 16]; + let n = raw_read(p_reader.read, &mut buf); + assert!(n > 0, "read returned {}", n); + cap.lock().unwrap().extend_from_slice(&buf[..n as usize]); + }); + + let writer = spawn(move || { + // Yield so the reader gets to park first. + yield_now(); + yield_now(); + // Sleep a touch so the reader is definitely waiting in epoll. + smarm::sleep(Duration::from_millis(5)); + let n = raw_write(p_writer.write, b"hello"); + assert_eq!(n, 5); + }); + + reader.join().unwrap(); + writer.join().unwrap(); + }); + + assert_eq!(*captured.lock().unwrap(), b"hello"); +} + +// --------------------------------------------------------------------------- +// The smarm::scheduler::read sugar — wait_readable + libc::read in one call. +// --------------------------------------------------------------------------- + +#[test] +fn read_sugar_returns_bytes_from_pipe() { + let captured: Arc>> = Arc::new(StdMutex::new(Vec::new())); + let cap = captured.clone(); + + let p = Arc::new(Pipe::new()); + let p_reader = p.clone(); + let p_writer = p.clone(); + + run(move || { + let reader = spawn(move || { + let mut buf = [0u8; 16]; + let n = smarm::scheduler::read(p_reader.read, &mut buf) + .expect("smarm::scheduler::read failed"); + cap.lock().unwrap().extend_from_slice(&buf[..n]); + }); + + let writer = spawn(move || { + yield_now(); + smarm::sleep(Duration::from_millis(5)); + let _ = raw_write(p_writer.write, b"world"); + }); + + reader.join().unwrap(); + writer.join().unwrap(); + }); + + assert_eq!(*captured.lock().unwrap(), b"world"); +} + +// --------------------------------------------------------------------------- +// wait_writable + write — though pipes are almost always writable; the +// useful test here is that the call doesn't hang on a writable fd. +// --------------------------------------------------------------------------- + +#[test] +fn write_sugar_sends_bytes_to_pipe() { + let counter = Arc::new(AtomicU32::new(0)); + let c = counter.clone(); + + let p = Arc::new(Pipe::new()); + let p_writer = p.clone(); + let p_reader = p.clone(); + + run(move || { + let writer = spawn(move || { + // Pipe is empty + has buffer space, so this returns immediately + // after wait_writable wakes (which happens fast because the + // kernel marks an empty pipe as immediately writable). + let n = smarm::scheduler::write(p_writer.write, b"smarm") + .expect("write failed"); + assert_eq!(n, 5); + c.fetch_add(1, Ordering::SeqCst); + }); + + let reader = spawn(move || { + // Give the writer time. + smarm::sleep(Duration::from_millis(10)); + let mut buf = [0u8; 16]; + let n = raw_read(p_reader.read, &mut buf); + assert_eq!(n, 5); + assert_eq!(&buf[..5], b"smarm"); + }); + + writer.join().unwrap(); + reader.join().unwrap(); + }); + + assert_eq!(counter.load(Ordering::SeqCst), 1); +} + +// --------------------------------------------------------------------------- +// While an actor is parked on wait_readable, other actors keep running. +// --------------------------------------------------------------------------- + +#[test] +fn other_actors_run_while_one_is_parked_on_wait_readable() { + let log: Arc>> = Arc::new(StdMutex::new(Vec::new())); + let la = log.clone(); + let lb = log.clone(); + + let p = Arc::new(Pipe::new()); + let p_a = p.clone(); + let p_b = p.clone(); + + run(move || { + let a = spawn(move || { + la.lock().unwrap().push(b'A'); + wait_readable(p_a.read).unwrap(); + la.lock().unwrap().push(b'a'); + }); + + let b = spawn(move || { + // A starts parking on the empty pipe; B should be free to do + // its work in the meantime. + for _ in 0..3 { + yield_now(); + lb.lock().unwrap().push(b'B'); + } + // Now wake A. + let _ = raw_write(p_b.write, b"x"); + }); + + a.join().unwrap(); + b.join().unwrap(); + }); + + let v = log.lock().unwrap(); + // A goes first ('A'), then B makes progress (multiple 'B's) while A is + // parked, then A wakes and finishes ('a'). + let pos_big_a = v.iter().position(|&c| c == b'A').unwrap(); + let pos_lit_a = v.iter().position(|&c| c == b'a').unwrap(); + let big_b_count = v.iter().filter(|&&c| c == b'B').count(); + assert_eq!(big_b_count, 3, "B should have made 3 steps: {:?}", *v); + assert!(pos_big_a < pos_lit_a, "A pre-park before A post-park: {:?}", *v); + // At least the last B step should be before A resumes. + let last_big_b = v.iter().rposition(|&c| c == b'B').unwrap(); + assert!(last_big_b < pos_lit_a, "B should finish before A resumes: {:?}", *v); +} + +// --------------------------------------------------------------------------- +// Two-way pipe ping-pong via wait_readable. +// --------------------------------------------------------------------------- + +#[test] +fn ping_pong_between_two_pipes_completes() { + // a_to_b: actor A writes, actor B reads. + // b_to_a: actor B writes, actor A reads. + let a_to_b = Arc::new(Pipe::new()); + let b_to_a = Arc::new(Pipe::new()); + + let counter = Arc::new(AtomicU32::new(0)); + let ca = counter.clone(); + let cb = counter.clone(); + + let a_to_b_a = a_to_b.clone(); + let a_to_b_b = a_to_b.clone(); + let b_to_a_a = b_to_a.clone(); + let b_to_a_b = b_to_a.clone(); + + run(move || { + let a = spawn(move || { + for _ in 0..5 { + let _ = raw_write(a_to_b_a.write, b"x"); + wait_readable(b_to_a_a.read).unwrap(); + let mut buf = [0u8; 4]; + let _ = raw_read(b_to_a_a.read, &mut buf); + ca.fetch_add(1, Ordering::SeqCst); + } + }); + + let b = spawn(move || { + for _ in 0..5 { + wait_readable(a_to_b_b.read).unwrap(); + let mut buf = [0u8; 4]; + let _ = raw_read(a_to_b_b.read, &mut buf); + let _ = raw_write(b_to_a_b.write, b"y"); + cb.fetch_add(1, Ordering::SeqCst); + } + }); + + a.join().unwrap(); + b.join().unwrap(); + }); + + // Both sides did 5 rounds; counter is incremented by both, so total = 10. + assert_eq!(counter.load(Ordering::SeqCst), 10); +} + +// --------------------------------------------------------------------------- +// Same fd reused across calls — DEL+ADD cycle works. +// --------------------------------------------------------------------------- + +#[test] +fn same_fd_can_be_waited_on_repeatedly() { + let p = Arc::new(Pipe::new()); + let p_r = p.clone(); + let p_w = p.clone(); + let counter = Arc::new(AtomicU32::new(0)); + let c = counter.clone(); + + run(move || { + let reader = spawn(move || { + for _ in 0..4 { + wait_readable(p_r.read).unwrap(); + let mut buf = [0u8; 4]; + let n = raw_read(p_r.read, &mut buf); + assert!(n > 0); + c.fetch_add(1, Ordering::SeqCst); + } + }); + + let writer = spawn(move || { + for _ in 0..4 { + yield_now(); + smarm::sleep(Duration::from_millis(2)); + let _ = raw_write(p_w.write, b"z"); + } + }); + + reader.join().unwrap(); + writer.join().unwrap(); + }); + + assert_eq!(counter.load(Ordering::SeqCst), 4); +} + +// --------------------------------------------------------------------------- +// Sanity that wait_writable on an already-writable pipe returns promptly. +// --------------------------------------------------------------------------- + +#[test] +fn wait_writable_on_empty_pipe_returns_quickly() { + let p = Arc::new(Pipe::new()); + let p_w = p.clone(); + + let start = std::time::Instant::now(); + run(move || { + wait_writable(p_w.write).unwrap(); + }); + let elapsed = start.elapsed(); + assert!( + elapsed < Duration::from_millis(200), + "wait_writable should be fast on a writable fd, took {:?}", + elapsed + ); +} diff --git a/tests/mutex.rs b/tests/mutex.rs new file mode 100644 index 0000000..92a0216 --- /dev/null +++ b/tests/mutex.rs @@ -0,0 +1,311 @@ +//! `loom::Mutex` tests. All run under the scheduler because `lock()` +//! needs to be able to park. + +use smarm::{run, spawn, yield_now, LockTimeout, Mutex}; +use std::sync::Arc; +use std::sync::Mutex as StdMutex; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::{Duration, Instant}; + +// --------------------------------------------------------------------------- +// Uncontended fast path +// --------------------------------------------------------------------------- + +#[test] +fn lock_free_mutex_succeeds() { + let captured = Arc::new(AtomicU32::new(0)); + let c = captured.clone(); + run(move || { + let m = Mutex::new(42u32); + { + let g = m.lock().unwrap(); + c.store(*g, Ordering::SeqCst); + } + // After drop we can lock again. + let g2 = m.lock().unwrap(); + assert_eq!(*g2, 42); + }); + assert_eq!(captured.load(Ordering::SeqCst), 42); +} + +#[test] +fn try_lock_returns_some_when_free_none_when_held() { + let success_flag = Arc::new(AtomicU32::new(0)); + let s = success_flag.clone(); + run(move || { + let m = Mutex::new(0u32); + let g = m.try_lock().expect("free"); + // Holding the guard; a second try_lock on the same actor should fail. + assert!(m.try_lock().is_none()); + drop(g); + // Now free again. + let g2 = m.try_lock().expect("free again"); + drop(g2); + s.store(1, Ordering::SeqCst); + }); + assert_eq!(success_flag.load(Ordering::SeqCst), 1); +} + +#[test] +fn guard_mutates_value_visible_through_next_lock() { + let final_value = Arc::new(AtomicU32::new(0)); + let f = final_value.clone(); + run(move || { + let m = Mutex::new(0u32); + { + let mut g = m.lock().unwrap(); + *g = 7; + } + let g2 = m.lock().unwrap(); + f.store(*g2, Ordering::SeqCst); + }); + assert_eq!(final_value.load(Ordering::SeqCst), 7); +} + +// --------------------------------------------------------------------------- +// Contention: a second actor parks until the first releases. +// --------------------------------------------------------------------------- + +#[test] +fn contended_lock_parks_until_holder_releases() { + // Actor A locks, yields (still holding), then releases. Actor B tries + // to lock in between — B should park, then succeed after A drops. + let log: Arc>> = Arc::new(StdMutex::new(Vec::new())); + let la = log.clone(); + let lb = log.clone(); + + run(move || { + let m = Mutex::new(0u32); + let m_a = m.clone(); + let m_b = m.clone(); + + let a = spawn(move || { + let g = m_a.lock().unwrap(); + la.lock().unwrap().push("A_locked"); + // While holding, yield to let B run. + yield_now(); + la.lock().unwrap().push("A_dropping"); + drop(g); + la.lock().unwrap().push("A_dropped"); + }); + let b = spawn(move || { + // Wait a moment to make sure A locks first. + yield_now(); + lb.lock().unwrap().push("B_try"); + let _g = m_b.lock().unwrap(); + lb.lock().unwrap().push("B_locked"); + }); + a.join().unwrap(); + b.join().unwrap(); + }); + + let v = log.lock().unwrap(); + // A locks, B tries (parks), A drops, B gets the lock. + let pos_a_locked = v.iter().position(|s| *s == "A_locked").unwrap(); + let pos_b_try = v.iter().position(|s| *s == "B_try").unwrap(); + let pos_a_dropped = v.iter().position(|s| *s == "A_dropped").unwrap(); + let pos_b_locked = v.iter().position(|s| *s == "B_locked").unwrap(); + + assert!(pos_a_locked < pos_b_try, "log: {:?}", *v); + assert!(pos_b_try < pos_a_dropped, "B should attempt before A drops: {:?}", *v); + assert!(pos_a_dropped < pos_b_locked, "B should lock only after A drops: {:?}", *v); +} + +// --------------------------------------------------------------------------- +// Timeout: B times out while A holds forever. +// --------------------------------------------------------------------------- + +#[test] +fn lock_timeout_returns_err_when_holder_never_releases() { + let saw_err = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let s = saw_err.clone(); + + run(move || { + let m: Mutex = Mutex::new(0); + let m_a = m.clone(); + let m_b = m.clone(); + + let a = spawn(move || { + // Hold the lock for 100ms, blocking B's attempt with a 20ms timeout. + let _g = m_a.lock().unwrap(); + smarm::sleep(Duration::from_millis(100)); + // _g drops here. + }); + let b = spawn(move || { + // Let A acquire first. + yield_now(); + let t0 = Instant::now(); + let res = m_b.lock_timeout(Duration::from_millis(20)); + let elapsed = t0.elapsed(); + assert!(matches!(res, Err(LockTimeout)), "got {:?}", res); + // Sanity: actually waited approximately the timeout. + assert!( + elapsed >= Duration::from_millis(15), + "timed out too fast: {:?}", + elapsed + ); + assert!( + elapsed < Duration::from_millis(80), + "timed out far too slow: {:?}", + elapsed + ); + s.store(true, Ordering::SeqCst); + }); + a.join().unwrap(); + b.join().unwrap(); + }); + + assert!(saw_err.load(Ordering::SeqCst)); +} + +// --------------------------------------------------------------------------- +// FIFO fairness: when many actors queue, they get the lock in arrival order. +// --------------------------------------------------------------------------- + +#[test] +fn waiters_are_granted_the_lock_in_fifo_order() { + let order: Arc>> = Arc::new(StdMutex::new(Vec::new())); + + run({ + let order = order.clone(); + move || { + let m: Mutex<()> = Mutex::new(()); + + // Holder: takes the lock, yields to let others queue up, then + // releases. Each waiter records its arrival order on acquisition. + let m_holder = m.clone(); + let holder = spawn(move || { + let g = m_holder.lock().unwrap(); + // Let waiters pile up. + for _ in 0..5 { + yield_now(); + } + drop(g); + }); + + // Spawn 4 waiters in order 1, 2, 3, 4. Each yields once before + // calling lock(), so we know the holder ran first. + let mut handles = vec![holder]; + for id in 1u32..=4 { + let m_w = m.clone(); + let o = order.clone(); + handles.push(spawn(move || { + // Stagger the lock attempts so they arrive in order. + for _ in 0..id { + yield_now(); + } + let _g = m_w.lock().unwrap(); + o.lock().unwrap().push(id); + })); + } + for h in handles { + h.join().unwrap(); + } + } + }); + + let v = order.lock().unwrap().clone(); + assert_eq!(v, vec![1, 2, 3, 4], "waiters should acquire in arrival order"); +} + +// --------------------------------------------------------------------------- +// Grant-vs-timeout race: holder drops just before timer would fire — waiter +// should get the lock, not LockTimeout. +// --------------------------------------------------------------------------- + +#[test] +fn grant_wins_when_holder_releases_before_timeout() { + let got_lock = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let g = got_lock.clone(); + + run(move || { + let m: Mutex = Mutex::new(0); + let m_a = m.clone(); + let m_b = m.clone(); + + let a = spawn(move || { + let _g = m_a.lock().unwrap(); + // Hold for 10ms, well under B's 100ms timeout. + smarm::sleep(Duration::from_millis(10)); + }); + let b = spawn(move || { + yield_now(); + let res = m_b.lock_timeout(Duration::from_millis(100)); + if res.is_ok() { + g.store(true, Ordering::SeqCst); + } + }); + a.join().unwrap(); + b.join().unwrap(); + }); + + assert!(got_lock.load(Ordering::SeqCst)); +} + +// --------------------------------------------------------------------------- +// Panic in critical section: next waiter still gets the lock (no poisoning). +// --------------------------------------------------------------------------- + +#[test] +fn next_waiter_gets_lock_after_holder_panics() { + let next_got_it = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let n = next_got_it.clone(); + + run(move || { + let m: Mutex = Mutex::new(7); + let m_a = m.clone(); + let m_b = m.clone(); + + let a = spawn(move || { + let _g = m_a.lock().unwrap(); + yield_now(); + panic!("holder dies mid-critical-section"); + }); + let b = spawn(move || { + yield_now(); + // A is dead but its guard's Drop ran during unwind. We get the lock. + let g = m_b.lock_timeout(Duration::from_millis(100)).unwrap(); + assert_eq!(*g, 7); + n.store(true, Ordering::SeqCst); + }); + let _ = a.join(); // panic — expected + b.join().unwrap(); + }); + + assert!(next_got_it.load(Ordering::SeqCst)); +} + +// --------------------------------------------------------------------------- +// Multiple short critical sections under contention all complete (no lost +// wakeups, no deadlock). Counts up to N from M actors. +// --------------------------------------------------------------------------- + +#[test] +fn many_actors_increment_shared_counter_via_mutex() { + const ACTORS: u32 = 8; + const PER_ACTOR: u32 = 50; + + let final_value = Arc::new(AtomicU32::new(0)); + let fv = final_value.clone(); + + run(move || { + let m: Mutex = Mutex::new(0); + let mut handles = Vec::new(); + for _ in 0..ACTORS { + let m_i = m.clone(); + handles.push(spawn(move || { + for _ in 0..PER_ACTOR { + let mut g = m_i.lock().unwrap(); + *g += 1; + } + })); + } + for h in handles { + h.join().unwrap(); + } + let g = m.lock().unwrap(); + fv.store(*g, Ordering::SeqCst); + }); + + assert_eq!(final_value.load(Ordering::SeqCst), ACTORS * PER_ACTOR); +} diff --git a/tests/timer.rs b/tests/timer.rs index 14756d9..e22a23d 100644 --- a/tests/timer.rs +++ b/tests/timer.rs @@ -114,3 +114,96 @@ fn many_concurrent_sleepers_all_wake() { }); assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 20); } + +// --------------------------------------------------------------------------- +// Direct tests on the Timers data structure. No scheduler involved — these +// cover the new Reason machinery without needing a Mutex implementation. +// --------------------------------------------------------------------------- + +use smarm::pid::Pid; +use smarm::timer::{Reason, TimerTarget, Timers}; +use std::cell::RefCell; +use std::rc::Rc; + +struct RecordingTarget { + calls: RefCell>, +} +impl TimerTarget for RecordingTarget { + fn on_timeout(&self, pid: Pid, seq: u64) { + self.calls.borrow_mut().push((pid, seq)); + } +} + +#[test] +fn timers_pop_due_returns_entries_in_deadline_order() { + let mut t = Timers::new(); + let now = Instant::now(); + // Insert out of order; pop_due should hand them back sorted by deadline. + t.insert_sleep(now + Duration::from_millis(30), Pid::new(0, 0)); + t.insert_sleep(now + Duration::from_millis(10), Pid::new(1, 0)); + t.insert_sleep(now + Duration::from_millis(20), Pid::new(2, 0)); + + // Advance past all of them. + let due = t.pop_due(now + Duration::from_millis(50)); + let pids: Vec = due.iter().map(|e| e.pid.index()).collect(); + assert_eq!(pids, vec![1, 2, 0]); + assert!(t.is_empty()); +} + +#[test] +fn timers_only_pop_entries_whose_deadline_has_passed() { + let mut t = Timers::new(); + let now = Instant::now(); + t.insert_sleep(now + Duration::from_millis(5), Pid::new(0, 0)); + t.insert_sleep(now + Duration::from_millis(100), Pid::new(1, 0)); + + let due = t.pop_due(now + Duration::from_millis(20)); + assert_eq!(due.len(), 1); + assert_eq!(due[0].pid.index(), 0); + assert!(!t.is_empty()); + // The unpopped entry's deadline is still visible. + assert!(t.peek_deadline().is_some()); +} + +#[test] +fn timers_mix_sleep_and_wait_timeout_reasons() { + let mut t = Timers::new(); + let target = Rc::new(RecordingTarget { calls: RefCell::new(Vec::new()) }); + let now = Instant::now(); + + t.insert_sleep(now + Duration::from_millis(5), Pid::new(0, 0)); + t.insert( + now + Duration::from_millis(10), + Pid::new(1, 0), + Reason::WaitTimeout { target: target.clone(), wait_seq: 42 }, + ); + + let due = t.pop_due(now + Duration::from_millis(20)); + assert_eq!(due.len(), 2); + + // Order: Sleep (5ms) first, WaitTimeout (10ms) second. + match &due[0].reason { + Reason::Sleep => {} + _ => panic!("first entry should be a Sleep"), + } + match &due[1].reason { + Reason::WaitTimeout { wait_seq, .. } => assert_eq!(*wait_seq, 42), + _ => panic!("second entry should be a WaitTimeout"), + } +} + +#[test] +fn same_deadline_entries_pop_in_insertion_order() { + // The `seq` tiebreaker means inserting two entries with the same + // deadline preserves the order they were inserted. + let mut t = Timers::new(); + let now = Instant::now(); + let d = now + Duration::from_millis(10); + t.insert_sleep(d, Pid::new(0, 0)); + t.insert_sleep(d, Pid::new(1, 0)); + t.insert_sleep(d, Pid::new(2, 0)); + + let due = t.pop_due(now + Duration::from_millis(20)); + let pids: Vec = due.iter().map(|e| e.pid.index()).collect(); + assert_eq!(pids, vec![0, 1, 2]); +}