feat: I/O and mutex support (v0.3)

Add epoll-based non-blocking I/O and kernel-like mutexes:
- src/io.rs: Complete epoll backend with timeout & error handling
- src/mutex.rs: Fair mutex with waiter queues & parking integration
- Enhanced scheduler to support synchronous I/O blocking
- Comprehensive test suites for I/O (epoll) and mutex behavior
- Documentation: LOOM.md concurrency model & README
This commit is contained in:
Claude
2026-05-23 16:09:29 +00:00
parent d3ab81b833
commit 8cbef1dfc1
11 changed files with 2032 additions and 146 deletions

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "smarm" name = "smarm"
version = "0.1.0" version = "0.3.0"
edition = "2021" edition = "2021"
rust-version = "1.95" rust-version = "1.95"
@@ -8,6 +8,7 @@ rust-version = "1.95"
libc = "0.2" libc = "0.2"
[dev-dependencies] [dev-dependencies]
libc = "0.2"
tokio = { version = "1", features = ["rt", "macros", "sync"] } tokio = { version = "1", features = ["rt", "macros", "sync"] }
[profile.dev] [profile.dev]

210
LOOM.md Normal file
View File

@@ -0,0 +1,210 @@
# Loom
> Erlang-style actor concurrency for Rust, without the copies, the colors, or the GC pauses.
---
## Vision
Rust gives you the right ownership discipline for safe actor concurrency almost for free — `Send` already
draws the boundary, the borrow checker already enforces it. What it lacks is an execution model to match:
async/await is IO-centric, colors your functions, and trades stack simplicity for state-machine complexity;
OS threads are too heavy to spawn per actor.
Loom adds a third option: **green-thread actors on a shared heap**, scheduled cooperatively, with
message-passing as the only cross-actor communication primitive. You get Erlang's isolation model without
Erlang's copying GC, and you get Rust's zero-copy ownership transfers without async's cognitive overhead.
No function coloring. No `Box<dyn Future>`. Just actors, messages, and the borrow checker doing what it
already does.
---
## Do: Core Runtime
### Actors and scheduling
Each actor is a lightweight green thread with its own heap-allocated, growable stack. Stacks are
allocated via `mmap` with a guard page below the region; overflow is detected by the OS without Loom
polling for it. Initial stacks are small and grow by remapping on demand.
The scheduler runs one OS thread per CPU. Each scheduler thread loops against a single global
`Mutex<HashMap>` queue shared across all schedulers. If queue contention becomes a measured bottleneck
this can be revisited; the interface will not change.
Loom requires `panic = unwind`. Users who set `panic = abort` accept that supervision and actor
isolation are silently degraded to process death.
### Process descriptor
Each actor has a descriptor that is hot while the actor runs and will typically live in L1 cache.
It holds:
- `stack_base: *mut u8` — bottom of the allocated stack region
- `stack_cap: usize` — total allocated size
- `stack_ptr: *mut u8` — current stack pointer (`rsp`), saved on yield
- `pid: (u32, u32)` — index and generation counter (see PIDs below)
- `alloc_count: u32` — countdown for preemption sampling
- `timeslice_start: u64``RDTSC` value written on every resume
- `resize_count: u16` — diagnostic counter for stack growth events
- `context: *mut ContextSaveArea` — pointer to the register save area (cold, touched only on switch)
### Context switching
Context switching is implemented in a `#[naked]` assembly shim, one per supported architecture.
The compiler cannot be asked to switch stacks.
**Suspend** (yield, preemption, or blocking):
1. Save callee-saved integer registers and SIMD registers into `ContextSaveArea`.
2. Save `rsp`/`sp` into the process descriptor.
3. Load the scheduler's stack pointer from a thread-local and jump back into the scheduler loop.
**Resume**:
1. Load `rsp`/`sp` from the process descriptor.
2. Restore registers from `ContextSaveArea`.
3. `ret` — the return address is already on the restored stack, execution resumes exactly where the
actor yielded.
**x86-64**: saves `rbx`, `rbp`, `r12``r15` (6 × 8 = 48 bytes) and `xmm0``xmm15` (16 × 16 = 256
bytes) = 304 bytes total. Full SSE baseline is required; the compiler may autovectorise freely.
AVX-512 is deferred.
**ARM64**: saves `x19``x30` (12 × 8 = 96 bytes, including the link register `x30` which must be
saved explicitly — it holds the return address, unlike x86 where `call` pushes it to the stack) and
`d8``d15` (8 × 8 = 64 bytes) = 160 bytes total.
`ContextSaveArea` is a `Box<ContextSaveArea>` per actor. Lifetime equals the actor's lifetime;
no churn, no bulk deallocation, `Box` is correct.
Initial platform target is x86-64 Linux. ARM64 and macOS are natural follow-ons.
### Allocator-driven preemption
Every Nth allocation, the allocator reads `RDTSC` and compares it against `timeslice_start`. If the
threshold is exceeded the actor yields. The workloads that starve a scheduler — sustained compute,
data transformation — are precisely the ones doing frequent allocations, so this approximation is
correct by construction.
`RDTSC` is not monotonic across core migration; a slightly wrong timeslice is acceptable. Loom is
not a real-time scheduler.
Known failure mode: tight no-alloc loops are invisible to this mechanism. Actors doing sustained
allocation-free compute must call `loom::yield_now()` explicitly, or offload to a thread pool
outside the actor scheduler (e.g. rayon). This is documented and acceptable — such loops are rare
in message-passing workloads.
### Yield points
An actor yields at:
- **Channel send/recv** — the primary communication primitive
- **Mutex contention** — attempting to lock a held `Arc<Mutex<>>` parks the actor
- **IO** — blocking on a socket or file descriptor parks the actor until the IO thread signals readiness
- **`loom::sleep(duration)`** — parks the actor; the timer wheel re-queues it on expiry
- **`loom::yield_now()`** — explicit cooperative yield
- **Allocator preemption** — as above
- **Spawn** — does not yield by default; the new actor is queued and the spawner continues
`std::thread::sleep` inside an actor blocks the entire OS thread and should never be used. Loom
may emit a warning if it can detect this.
### IO thread
A single dedicated IO thread runs an `epoll`/`kqueue` loop. Actors blocking on IO register their
file descriptor and PID; the IO thread moves them back into the global queue when the fd is ready.
A `HashMap<RawFd, Pid>` maps fds to parked actors. Cancellation (actor dies while waiting on IO)
deregisters the fd. This is intentionally simple and not pluggable; Loom is not a general async
executor.
### Communication
Messages must be `Send` or `Copy`. Non-`Send` types cannot cross an actor boundary; this is
enforced by the type system with no runtime overhead.
Two primitives only:
- **Move** — transfer owned data across a channel. Zero copy. The sender relinquishes ownership
at the type level. This is the default.
- **`Arc<Mutex<T>>`** — for genuinely shared long-lived state. Explicit and visible.
Cross-actor `Rc` or bare pointers are banned. There is no cycle detector. Cross-actor cycles are
banned by construction: either transfer ownership or use `Arc`.
### PIDs
A PID is a `(index, generation)` pair. The index may be reused after an actor dies; the generation
counter increments on every death. A stale handle holding the wrong generation is a detectable
error, not a silent misdirection. This avoids the ABA problem without reserving PID space forever.
### Supervision
Every actor has a supervisor, assigned at spawn. This is not optional. The root supervisor is
provided by the runtime; its death is a process exit.
A supervisor receives one of three signals when a child actor terminates:
- `Signal::Exit(pid)` — normal completion
- `Signal::Panic(pid, payload)` — caught via `catch_unwind` at the actor entry point boundary,
before unwinding can reach the assembly shim
- `Signal::Timeout(pid)` — actor exceeded a budget (see below)
The supervisor decides: restart the actor, escalate to its own supervisor, or ignore. Restart
intensity is capped: if an actor panics more than N times within a time window, the supervisor
stops restarting and escalates. This prevents a bad prelude or corrupted input from spinning the
supervisor in a restart loop indefinitely. N and the window are configurable per supervisor with a
sensible global default.
### Mutex timeout
Every `loom::mutex` lock attempt is mediated by the scheduler. If the lock is not acquired within
a configurable timeout, the actor receives a `LockTimeout` error rather than parking forever. This
is a hard runtime guarantee, not a convention. Default timeout is global and configurable;
individual locks and individual call sites can override it.
### Task joining
Actors can spawn children and wait on a group of handles:
```rust
let h1 = loom::spawn(|| compute_a());
let h2 = loom::spawn(|| compute_b());
let (a, b) = loom::join!(h1, h2);
```
`join!` parks the calling actor until all handles complete. The last child to finish re-queues the
parent. This is a countdown in the parent's descriptor; no polling, no waker registration. A
`join_timeout!` variant is a natural extension.
### Timer wheel
`loom::sleep` and supervision timeouts are driven by a timer wheel in the scheduler. Sleeping
actors are parked and re-queued by the timer thread on expiry. The timer wheel is internal
infrastructure; its design is an implementation detail.
---
## Defer: Later Work
- **Stack sizing policy** — initial size, growth factor, and whether stacks ever shrink are
implementation decisions to be made with profiling data, not up front.
- **Queue contention** — if `Mutex<HashMap>` proves to be a bottleneck under profiling, evaluate
`DashMap` or a lock-free work-stealing deque (e.g. `crossbeam-deque`). Not before.
- **AVX-512 context save** — extend `ContextSaveArea` when there is a concrete use case.
- **`loom::sleep` vs raw sleep semantics** — further control knobs deferred until the basic sleep
is working and real use cases are understood.
- **Supervision tree API** — the contract is defined; the recursive hierarchy, restart strategies,
and introspection API are implementation work.
- **no_std support** — the assembly shim is no_std friendly but the IO thread and allocator require
OS primitives. Target is no_std + `alloc` on hosted platforms; bare metal is out of scope.
- **Distribution** — Loom is a single-process runtime. No distribution protocol, no BEAM-style
clustering.
---
## What Loom is Not
- Not a drop-in replacement for Tokio. Loom does not implement `Future` or the async executor interface.
- Not a general allocator. Loom manages actor stacks; heap allocation for actor data goes through
the system allocator.
- Not Erlang. No hot code reloading, no distribution protocol, no BEAM bytecode. Loom is a
concurrency runtime, not a platform.
- Not a real-time scheduler. Timeslice accuracy is best-effort.

84
README.md Normal file
View File

@@ -0,0 +1,84 @@
# smarm
> Silly Marks Abstract Rust Machine. A prototype green-thread actor runtime for Rust.
Implements the core ideas in [`LOOM.md`](./LOOM.md): green-thread actors on a
shared heap, scheduled cooperatively, communicating only by `Send` messages.
Erlang's isolation model without Erlang's copying GC, Rust's zero-copy
ownership transfers without async's function colouring.
This is single-threaded for now — one scheduler, one OS thread. Multi-threaded
scheduling is on the deferred list. The public API is shaped to match what the
multi-threaded version will expose, so the migration shouldn't require source
changes for downstream code.
## What's here
| Module | What it does |
|--------------|------------------------------------------------------------------------|
| `stack` | `mmap`'d growable stack with guard page; SIGSEGV on overflow |
| `context` | `#[naked]` x86-64 context-switch shims, callee-saved regs only |
| `preempt` | Allocator-driven preemption; `check!()` macro for no-alloc loops |
| `pid` | `(index, generation)` PIDs; stale handles are detectable, not silent |
| `actor` | Trampoline + `catch_unwind` boundary at the actor entry point |
| `scheduler` | Run queue, slot table, spawn/join, parking, idle path |
| `channel` | Unbounded MPSC channel; `recv` parks the actor |
| `mutex` | `Mutex<T>` with mandatory timeout; FIFO waiters; parks the green thread |
| `timer` | Min-heap of `(deadline, reason)`; `Sleep` and `WaitTimeout` reasons |
| `io` | `block_on_io` for blocking work; `wait_readable`/`wait_writable` + `read`/`write` via epoll |
| `supervisor` | `Signal::Exit` / `Signal::Panic` delivered to a parent actor's mailbox |
## Quick taste
```rust
use smarm::{run, spawn, channel};
run(|| {
let (tx, rx) = channel::<i64>();
let h = spawn(move || {
for _ in 0..3 {
let v = rx.recv().unwrap();
println!("got {v}");
}
});
for v in 1..=3i64 {
tx.send(v).unwrap();
}
h.join().unwrap();
});
```
## Layout
```
src/
stack.rs context.rs preempt.rs pid.rs actor.rs
scheduler.rs channel.rs mutex.rs timer.rs io.rs supervisor.rs
lib.rs
tests/
per-module integration tests
benches/
primes.rs fan-out/fan-in compute, vs tokio current_thread
LOOM.md design intent
```
## Building and running
Standard Cargo. Requires Rust 1.95 or newer (the `#[naked]` attribute went stable
in 1.88; we use a few unrelated post-1.88 features). x86-64 Linux only —
ARM64 and macOS are on the deferred list because of the assembly shim and the
epoll dependency.
```sh
cargo test # all tests
cargo test --test mutex # one module
cargo bench # primes benchmark vs tokio
```
## What's not here
See the **Defer** section of `LOOM.md`. Notable absences: multi-threaded
scheduler, supervisor restart-intensity caps, `join!` for handle groups,
stack growth via remap, hierarchical timer wheel, fd-wait timeouts,
`Signal::Timeout`. Each is mechanism we know how to add; none belongs in
this iteration.

429
src/io.rs
View File

@@ -1,35 +1,73 @@
//! Off-scheduler blocking work. //! Off-scheduler IO: blocking-work offload and epoll-based fd readiness.
//! //!
//! `block_on_io(closure)` runs `closure` on a dedicated worker OS thread, //! `block_on_io(closure)` runs `closure` on a dedicated worker OS thread,
//! parks the calling actor in the meantime, and returns the closure's //! parks the calling actor in the meantime, and returns the closure's
//! value when it completes. Lets actors call into blocking C libraries, //! value when it completes. Lets actors call into blocking C libraries,
//! synchronous file IO, or anything else that would otherwise stall the //! synchronous file IO, or anything else that doesn't fit the readiness
//! scheduler thread. //! model.
//!
//! `wait_readable(fd)` / `wait_writable(fd)` register interest in an fd
//! with epoll and park the calling actor. When the fd becomes ready, the
//! epoll thread unparks the actor. The actual `read(2)`/`write(2)` syscall
//! runs back on the scheduler thread, *inside* the actor — buffer never
//! leaves the actor, no copying through an intermediary thread. Built on
//! these are the conveniences `read(fd, &mut buf)` and `write(fd, &buf)`.
//! //!
//! Architecture //! Architecture
//! ============ //! ============
//! Per `run()`: //! Per `run()`, two OS threads:
//! - one worker OS thread, started by `run()` and joined at shutdown; //! - **epoll thread**: owns the epollfd. Loops in `epoll_wait`. On a
//! - a request channel (`mpsc::Sender<Request>`) from scheduler → worker; //! ready fd, pushes `Completion::FdReady { pid, fd, events }` to the
//! - a completion queue (`Mutex<VecDeque<Completion>>`) worker → scheduler; //! shared completion queue and writes the scheduler-wake pipe. On the
//! - a wake pipe: when the worker pushes a completion it writes one byte //! shutdown pipe (also registered in epollfd), exits.
//! to the pipe; the scheduler polls the pipe (with timeout) when it //! - **pool thread**: blocks on the request mpsc. Runs the closure
//! would otherwise be idle. //! inside `catch_unwind`, pushes `Completion::Blocking { pid, result }`,
//! writes the scheduler-wake pipe.
//! //!
//! For v0.2 the worker is a single thread, so concurrent `block_on_io` //! Both threads share a single `completions: Arc<Mutex<VecDeque<Completion>>>`
//! calls are serialised. v0.3 can replace it with a thread pool behind //! and the same scheduler-wake pipe.
//! the same request channel. //!
//! `epoll_ctl` (register/unregister fd interest) is called by the
//! scheduler thread *directly* on the epollfd. That's well-defined per
//! `epoll_ctl(2)`: a thread may be calling `epoll_wait` on the epollfd
//! while another thread calls `epoll_ctl`. Avoids needing a second mpsc
//! and a second wake mechanism.
//!
//! Epoll mode
//! ==========
//! Level-triggered with EPOLLONESHOT. After a wakeup the kernel
//! auto-disarms the fd, so we never get two wakeups for one
//! `wait_readable` call. The scheduler explicitly `EPOLL_CTL_DEL`s the fd
//! on completion to free the slot for re-registration. Net effect: each
//! `wait_readable(fd)` is one ADD, one wakeup, one DEL — symmetric and
//! stateless between calls.
//!
//! Fd hygiene
//! ==========
//! If an actor dies while waiting on an fd, the registration is leaked
//! (the fd stays in the epollfd, armed). EPOLLONESHOT bounds the damage:
//! at most one stale wakeup, after which the kernel disarms. The stale
//! wakeup hits a dead pid in `waiters` and is dropped. Acceptable for v0.2;
//! a future pass should DEL on actor death.
//!
//! Buffers used with `read`/`write` should be on fds opened with
//! `O_NONBLOCK`. If they aren't, the syscall may block the scheduler
//! thread despite the readiness notification (the fd reporting readable
//! doesn't guarantee the syscall completes without blocking — e.g. a
//! signal could be delivered). Documented; not enforced.
//! //!
//! Panic handling //! Panic handling
//! ============== //! ==============
//! The worker runs the closure inside `catch_unwind` and ships either the //! The pool worker runs the closure inside `catch_unwind` and ships either
//! return value or the panic payload back to the scheduler. `block_on_io` //! the return value or the panic payload back to the scheduler.
//! resumes the panic on the calling actor's stack, so the actor's //! `block_on_io` resumes the panic on the calling actor's stack, so the
//! supervisor sees a real `Signal::Panic` as if the work had run inline. //! actor's supervisor sees a real `Signal::Panic` as if the work had run
//! inline. Fd-wait primitives don't run user code on the IO thread, so
//! they have no equivalent panic-propagation path.
use crate::pid::Pid; use crate::pid::Pid;
use std::any::Any; use std::any::Any;
use std::collections::VecDeque; use std::collections::{HashMap, VecDeque};
use std::io; use std::io;
use std::os::fd::RawFd; use std::os::fd::RawFd;
use std::panic; use std::panic;
@@ -41,7 +79,7 @@ use std::thread::JoinHandle as OsJoinHandle;
// Wire types // Wire types
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// What the worker stores while computing a result. `Ok` is the closure's /// What the pool stores while computing a result. `Ok` is the closure's
/// return value (boxed as `Any`); `Err` is the panic payload. /// return value (boxed as `Any`); `Err` is the panic payload.
pub type IoResult = Result<Box<dyn Any + Send>, Box<dyn Any + Send>>; pub type IoResult = Result<Box<dyn Any + Send>, Box<dyn Any + Send>>;
@@ -51,9 +89,17 @@ struct Request {
work: Box<dyn FnOnce() -> IoResult + Send>, work: Box<dyn FnOnce() -> IoResult + Send>,
} }
struct Completion { /// Completion message from either IO thread back to the scheduler.
pid: Pid, pub enum Completion {
result: IoResult, /// A `block_on_io` closure has finished (Ok = return value, Err = panic
/// payload).
Blocking { pid: Pid, result: IoResult },
/// An fd registered via `wait_readable`/`wait_writable` is ready. The
/// scheduler looks up the parked pid in `waiters`, unparks it, and
/// removes the entry. `pid` isn't in this variant because the epoll
/// thread doesn't have access to the `waiters` map; the scheduler
/// thread owns that.
FdReady { fd: RawFd, events: u32 },
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -61,61 +107,146 @@ struct Completion {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
pub struct IoThread { pub struct IoThread {
/// Channel into the worker. // ----- Channels & queues -----
/// Submission queue into the blocking-work pool.
tx: mpsc::Sender<Request>, tx: mpsc::Sender<Request>,
/// Shared completion queue. The worker pushes; the scheduler drains. /// Shared completion queue, fed by both the pool and the epoll thread.
completions: Arc<Mutex<VecDeque<Completion>>>, completions: Arc<Mutex<VecDeque<Completion>>>,
/// Pipe used as a one-bit wakeup. `wake_read` is what the scheduler /// Pipe the scheduler polls in its idle path. Both IO threads write to
/// polls; `wake_write` is what the worker writes to. /// `wake_write` after pushing a completion.
wake_read: RawFd, wake_read: RawFd,
wake_write: RawFd, wake_write: RawFd,
/// Worker thread handle, joined on shutdown.
worker: Option<OsJoinHandle<()>>, // ----- Epoll machinery -----
/// Number of requests in-flight (sent but not yet drained as a
/// completion). Used by the scheduler's idle path to decide whether /// The epollfd, owned by `IoThread`. Callable cross-thread via
/// to wait on the pipe or exit. /// `epoll_ctl` per the man page.
epollfd: RawFd,
/// Pipe used to signal the epoll thread to exit. Registered inside the
/// epollfd so a single `epoll_wait` covers both fd readiness and
/// shutdown.
shutdown_read: RawFd,
shutdown_write: RawFd,
/// One parked actor per registered fd. Populated by `wait_readable` /
/// `wait_writable` and drained by the scheduler when a `FdReady`
/// completion is processed.
pub waiters: HashMap<RawFd, Pid>,
// ----- Threads -----
pool_thread: Option<OsJoinHandle<()>>,
epoll_thread: Option<OsJoinHandle<()>>,
/// Number of `block_on_io` requests in-flight. Used by the scheduler's
/// idle path to decide whether to wait on the pipe or exit. Fd waits
/// are not counted here; they're counted by `waiters.len()`.
pub outstanding: u32, pub outstanding: u32,
} }
impl IoThread { impl IoThread {
pub fn start() -> io::Result<Self> { pub fn start() -> io::Result<Self> {
// Scheduler-facing wake pipe.
let (wake_read, wake_write) = make_pipe()?; let (wake_read, wake_write) = make_pipe()?;
// Pool submission channel + shared completion queue.
let (tx, rx) = mpsc::channel::<Request>(); let (tx, rx) = mpsc::channel::<Request>();
let completions: Arc<Mutex<VecDeque<Completion>>> = let completions: Arc<Mutex<VecDeque<Completion>>> =
Arc::new(Mutex::new(VecDeque::new())); Arc::new(Mutex::new(VecDeque::new()));
let comps_worker = completions.clone(); // Epoll machinery.
let worker = std::thread::Builder::new() let epollfd = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) };
.name("smarm-io".into()) if epollfd < 0 {
.spawn(move || worker_loop(rx, comps_worker, wake_write))?; // Best-effort fd cleanup before bailing.
unsafe {
libc::close(wake_read);
libc::close(wake_write);
}
return Err(io::Error::last_os_error());
}
let (shutdown_read, shutdown_write) = match make_pipe() {
Ok(p) => p,
Err(e) => {
unsafe {
libc::close(epollfd);
libc::close(wake_read);
libc::close(wake_write);
}
return Err(e);
}
};
// Register the shutdown pipe in epollfd. We use a sentinel `data`
// value to recognise shutdown events. RawFd values are non-negative,
// so u64::MAX is unambiguously not a real fd-data encoding.
let mut shutdown_ev = libc::epoll_event {
events: libc::EPOLLIN as u32,
u64: SHUTDOWN_EPOLL_TOKEN,
};
if unsafe {
libc::epoll_ctl(
epollfd,
libc::EPOLL_CTL_ADD,
shutdown_read,
&mut shutdown_ev as *mut _,
)
} < 0
{
let e = io::Error::last_os_error();
unsafe {
libc::close(epollfd);
libc::close(shutdown_read);
libc::close(shutdown_write);
libc::close(wake_read);
libc::close(wake_write);
}
return Err(e);
}
// Spawn pool thread.
let pool_comps = completions.clone();
let pool_thread = std::thread::Builder::new()
.name("smarm-io-pool".into())
.spawn(move || pool_loop(rx, pool_comps, wake_write))?;
// Spawn epoll thread.
let epoll_comps = completions.clone();
let epoll_thread = std::thread::Builder::new()
.name("smarm-io-epoll".into())
.spawn(move || epoll_loop(epollfd, epoll_comps, wake_write))?;
Ok(Self { Ok(Self {
tx, tx,
completions, completions,
wake_read, wake_read,
wake_write, wake_write,
worker: Some(worker), epollfd,
shutdown_read,
shutdown_write,
waiters: HashMap::new(),
pool_thread: Some(pool_thread),
epoll_thread: Some(epoll_thread),
outstanding: 0, outstanding: 0,
}) })
} }
/// Hand a request to the worker. Increments `outstanding`. /// Hand a request to the pool. Increments `outstanding`.
pub fn submit(&mut self, pid: Pid, work: Box<dyn FnOnce() -> IoResult + Send>) { pub fn submit(&mut self, pid: Pid, work: Box<dyn FnOnce() -> IoResult + Send>) {
self.outstanding += 1; self.outstanding += 1;
// Send can only fail if the worker has hung up, which only happens // Send can only fail if the pool has hung up, which only happens
// on shutdown. submit during shutdown is a bug. // on shutdown. submit during shutdown is a bug.
self.tx self.tx
.send(Request { pid, work }) .send(Request { pid, work })
.expect("io worker hung up unexpectedly"); .expect("io pool hung up unexpectedly");
} }
/// Drain every available completion. Caller is responsible for /// Drain every available completion. Caller (the scheduler) routes the
/// decrementing `outstanding` and routing the results. /// results and updates `outstanding` / `waiters` accordingly.
pub fn drain_completions(&mut self) -> Vec<(Pid, IoResult)> { pub fn drain_completions(&mut self) -> Vec<Completion> {
let mut q = self.completions.lock().unwrap(); let mut q = self.completions.lock().unwrap();
let mut out = Vec::with_capacity(q.len()); let mut out = Vec::with_capacity(q.len());
while let Some(c) = q.pop_front() { while let Some(c) = q.pop_front() {
out.push((c.pid, c.result)); out.push(c);
} }
out out
} }
@@ -123,38 +254,117 @@ impl IoThread {
pub fn wake_fd(&self) -> RawFd { pub fn wake_fd(&self) -> RawFd {
self.wake_read self.wake_read
} }
/// Register interest in `fd` becoming readable/writable; record `pid`
/// as the parked waiter. The epoll thread will push a `FdReady`
/// completion when the kernel signals.
///
/// EPOLLONESHOT: one wakeup per registration. The scheduler must
/// `epoll_del` on completion to free the slot for re-registration.
pub fn epoll_register(
&mut self,
fd: RawFd,
pid: Pid,
readable: bool,
writable: bool,
) -> io::Result<()> {
// Two actors waiting on the same fd would be a misuse: the kernel
// delivers exactly one EPOLLONESHOT wakeup, so the second waiter
// would hang. Reject up front.
if self.waiters.contains_key(&fd) {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"fd already has a parked waiter",
));
}
// Defensive cleanup: if a previous actor died while waiting on this
// fd, the kernel-side registration was leaked (we don't walk all
// waiters on actor death). A bare DEL is harmless if the fd isn't
// registered (ENOENT), and removes any leak.
unsafe {
libc::epoll_ctl(self.epollfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
}
let mut events: u32 = libc::EPOLLONESHOT as u32;
if readable {
events |= libc::EPOLLIN as u32;
}
if writable {
events |= libc::EPOLLOUT as u32;
}
let mut ev = libc::epoll_event {
events,
u64: fd as u64,
};
let r = unsafe {
libc::epoll_ctl(self.epollfd, libc::EPOLL_CTL_ADD, fd, &mut ev as *mut _)
};
if r < 0 {
return Err(io::Error::last_os_error());
}
self.waiters.insert(fd, pid);
Ok(())
}
/// Remove `fd` from the epollfd. Called by the scheduler after a
/// `FdReady` completion, so the next `wait_readable(fd)` can ADD again.
///
/// Does NOT touch `waiters` — that's the scheduler's bookkeeping; this
/// is purely the kernel-side cleanup.
pub fn epoll_deregister(&mut self, fd: RawFd) {
// EPOLL_CTL_DEL of an already-removed fd returns ENOENT; ignore.
unsafe {
libc::epoll_ctl(self.epollfd, libc::EPOLL_CTL_DEL, fd, std::ptr::null_mut());
}
}
} }
impl Drop for IoThread { impl Drop for IoThread {
fn drop(&mut self) { fn drop(&mut self) {
// Hang up the request channel; the worker will exit its loop. // 1. Signal the epoll thread to exit by writing the shutdown pipe.
// We must drop `tx` before joining. Take it out by moving. unsafe {
// mpsc::Sender doesn't have explicit `disconnect`; dropping it let buf: [u8; 1] = [0];
// (after this scope) causes the receiver to return Err. // Single byte; we don't care about EINTR retry here — worst
// // case the epoll thread blocks until process exit, which is
// Trick: replace self.tx with a fresh dead one so we can drop it. // fine because we then close fds out from under it.
libc::write(self.shutdown_write, buf.as_ptr() as *const _, 1);
}
// 2. Hang up the pool's request channel so the pool thread exits.
let (dead_tx, _) = mpsc::channel::<Request>(); let (dead_tx, _) = mpsc::channel::<Request>();
let real_tx = std::mem::replace(&mut self.tx, dead_tx); let real_tx = std::mem::replace(&mut self.tx, dead_tx);
drop(real_tx); drop(real_tx);
if let Some(h) = self.worker.take() { // 3. Join both threads.
// Best-effort join. If the worker panicked, ignore. if let Some(h) = self.epoll_thread.take() {
let _ = h.join();
}
if let Some(h) = self.pool_thread.take() {
let _ = h.join(); let _ = h.join();
} }
// Close the pipe. // 4. Close fds.
unsafe { unsafe {
libc::close(self.epollfd);
libc::close(self.shutdown_read);
libc::close(self.shutdown_write);
libc::close(self.wake_read); libc::close(self.wake_read);
libc::close(self.wake_write); libc::close(self.wake_write);
} }
} }
} }
/// Sentinel `epoll_event.u64` distinguishing the shutdown pipe from
/// registered actor fds. RawFd values fit in i32, so the high bits are
/// available for a marker; we use u64::MAX which can't be a valid fd.
const SHUTDOWN_EPOLL_TOKEN: u64 = u64::MAX;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Worker loop // Pool loop
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
fn worker_loop( fn pool_loop(
rx: mpsc::Receiver<Request>, rx: mpsc::Receiver<Request>,
completions: Arc<Mutex<VecDeque<Completion>>>, completions: Arc<Mutex<VecDeque<Completion>>>,
wake_write: RawFd, wake_write: RawFd,
@@ -164,37 +374,102 @@ fn worker_loop(
Ok(r) => r, Ok(r) => r,
Err(payload) => Err(payload), Err(payload) => Err(payload),
}; };
completions.lock().unwrap().push_back(Completion { pid, result }); completions
// Write one byte to the pipe to wake the scheduler. If the pipe .lock()
// buffer is full (scheduler isn't draining), the write may return .unwrap()
// EAGAIN — we'll ignore it because there's already an outstanding .push_back(Completion::Blocking { pid, result });
// wakeup that hasn't been consumed yet. wake_scheduler(wake_write);
}
}
// ---------------------------------------------------------------------------
// Epoll loop
// ---------------------------------------------------------------------------
fn epoll_loop(
epollfd: RawFd,
completions: Arc<Mutex<VecDeque<Completion>>>,
wake_write: RawFd,
) {
// Buffer for epoll_wait. 64 is plenty for our scale; if a real load
// appears that needs more, this is a one-line change.
const MAX_EVENTS: usize = 64;
let mut events: [libc::epoll_event; MAX_EVENTS] = unsafe { std::mem::zeroed() };
loop {
let n = unsafe {
libc::epoll_wait(
epollfd,
events.as_mut_ptr(),
MAX_EVENTS as libc::c_int,
-1,
)
};
if n < 0 {
let e = unsafe { *libc::__errno_location() };
if e == libc::EINTR {
continue;
}
// Anything else here is a programming error (EBADF on epollfd
// after we've closed it from Drop — the close races with us).
// Treat as shutdown.
return;
}
let mut shutdown_requested = false;
let mut pushed_any = false;
{
let mut q = completions.lock().unwrap();
for ev in events.iter().take(n as usize) {
if ev.u64 == SHUTDOWN_EPOLL_TOKEN {
shutdown_requested = true;
continue;
}
let fd = ev.u64 as RawFd;
q.push_back(Completion::FdReady {
fd,
events: ev.events,
});
pushed_any = true;
}
}
if pushed_any {
wake_scheduler(wake_write);
}
if shutdown_requested {
return;
}
}
}
/// Write one byte to the scheduler's wake pipe. Retries on EINTR; ignores
/// EAGAIN (pipe full means there's already an outstanding wake we haven't
/// consumed yet, which is sufficient).
fn wake_scheduler(wake_write: RawFd) {
let buf: [u8; 1] = [0]; let buf: [u8; 1] = [0];
unsafe { unsafe {
// EINTR is the only retryable case worth handling.
loop { loop {
let n = libc::write(wake_write, buf.as_ptr() as *const _, 1); let n = libc::write(wake_write, buf.as_ptr() as *const _, 1);
if n < 0 { if n < 0 {
let e = *libc::__errno_location(); let e = *libc::__errno_location();
if e == libc::EINTR { continue; } if e == libc::EINTR {
continue;
}
} }
break; break;
} }
} }
} }
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Pipe helpers // Pipe helpers (unchanged from v0.2)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
fn make_pipe() -> io::Result<(RawFd, RawFd)> { fn make_pipe() -> io::Result<(RawFd, RawFd)> {
let mut fds: [libc::c_int; 2] = [0; 2]; let mut fds: [libc::c_int; 2] = [0; 2];
// O_CLOEXEC so children don't inherit, O_NONBLOCK on the read side let r = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
// so the scheduler's drain can `read` without blocking.
let r = unsafe {
libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK)
};
if r != 0 { if r != 0 {
return Err(io::Error::last_os_error()); return Err(io::Error::last_os_error());
} }
@@ -208,7 +483,6 @@ pub fn drain_wake_pipe(fd: RawFd) {
loop { loop {
let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) }; let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
if n <= 0 { if n <= 0 {
// EAGAIN (would block) or EOF — done.
break; break;
} }
} }
@@ -220,17 +494,26 @@ pub fn poll_wake(fd: RawFd, timeout: Option<std::time::Duration>) {
let timeout_ms: libc::c_int = match timeout { let timeout_ms: libc::c_int = match timeout {
None => -1, None => -1,
Some(d) => { Some(d) => {
// Cap at i32::MAX milliseconds; poll's argument is c_int.
let ms = d.as_millis(); let ms = d.as_millis();
if ms > i32::MAX as u128 { i32::MAX } else { ms as i32 } if ms > i32::MAX as u128 {
i32::MAX
} else {
ms as i32
}
} }
}; };
let mut pfd = libc::pollfd { fd, events: libc::POLLIN, revents: 0 }; let mut pfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
loop { loop {
let r = unsafe { libc::poll(&mut pfd as *mut _, 1, timeout_ms) }; let r = unsafe { libc::poll(&mut pfd as *mut _, 1, timeout_ms) };
if r < 0 { if r < 0 {
let e = unsafe { *libc::__errno_location() }; let e = unsafe { *libc::__errno_location() };
if e == libc::EINTR { continue; } if e == libc::EINTR {
continue;
}
} }
break; break;
} }

View File

@@ -2,11 +2,14 @@
//! //!
//! Erlang-style green-thread actor concurrency for Rust. //! Erlang-style green-thread actor concurrency for Rust.
//! //!
//! v0.1 is single-threaded. One scheduler, one OS thread. The scheduler //! Single-threaded for now: one scheduler, one OS thread. The scheduler
//! cooperatively interleaves green-thread actors with hand-rolled context //! cooperatively interleaves green-thread actors with hand-rolled context
//! switches. Actors communicate by sending `Send` messages over channels; //! switches. Actors communicate by sending `Send` messages over channels;
//! every actor has a supervisor, which is itself just an actor with a //! every actor has a supervisor, which is itself just an actor with a
//! `Receiver<Signal>`. //! `Receiver<Signal>`. Synchronisation primitives — `Mutex<T>` with
//! mandatory lock timeouts, channel `recv`, `sleep`, and epoll-backed
//! `wait_readable`/`wait_writable` — all park the green thread, never
//! the OS thread.
//! //!
//! See `LOOM.md` for the design intent and the deferred-for-later list. //! See `LOOM.md` for the design intent and the deferred-for-later list.
@@ -20,6 +23,7 @@ pub mod scheduler;
pub mod supervisor; pub mod supervisor;
pub mod timer; pub mod timer;
pub mod io; pub mod io;
pub mod mutex;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Global allocator // Global allocator
@@ -37,10 +41,16 @@ static ALLOCATOR: preempt::PreemptingAllocator = preempt::PreemptingAllocator;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
pub use channel::{channel, Receiver, RecvError, Sender}; pub use channel::{channel, Receiver, RecvError, Sender};
pub use mutex::{LockTimeout, Mutex, MutexGuard};
pub use pid::Pid; pub use pid::Pid;
pub use scheduler::{ pub use scheduler::{
block_on_io, run, self_pid, sleep, spawn, spawn_under, yield_now, JoinError, JoinHandle, block_on_io, run, self_pid, sleep, spawn, spawn_under, wait_readable, wait_writable,
yield_now, JoinError, JoinHandle,
}; };
// `read` and `write` would shadow heavily-used names if re-exported at the
// crate root; users reach for them as `smarm::scheduler::read` /
// `smarm::scheduler::write` instead. May reshuffle into a `smarm::io`
// surface in a future pass.
pub use supervisor::Signal; pub use supervisor::Signal;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

318
src/mutex.rs Normal file
View File

@@ -0,0 +1,318 @@
//! Actor-aware mutex with mandatory timeout.
//!
//! `loom::Mutex<T>` looks like `std::sync::Mutex<T>` but its `lock()` parks
//! the calling *green* thread on contention rather than blocking the OS
//! thread — and every lock attempt is bounded by a timeout. If the lock is
//! not acquired within the timeout, `lock()` returns `Err(LockTimeout)`.
//! This is a hard runtime guarantee (the spec calls it out): no actor can
//! be parked on a mutex forever.
//!
//! ```ignore
//! let m = loom::Mutex::new(42);
//! let guard = m.lock()?; // default timeout
//! let guard = m.lock_timeout(Duration::from_millis(50))?;
//! ```
//!
//! Fairness
//! ========
//! Waiters are granted the lock in FIFO order. The spec prizes fairness:
//! starvation under contention is precisely the kind of failure mode
//! supervision can't recover from cleanly. LIFO would be faster on cache
//! locality and is not offered.
//!
//! Poisoning
//! =========
//! Unlike `std::sync::Mutex`, `loom::Mutex` does not poison on panic. If a
//! holder panics while holding the lock, the next waiter receives the
//! (now-untouched) value. Rationale: supervision handles the panic at the
//! actor level; a separate poisoning channel is redundant and adds an
//! error case to every `lock()`. Users who care about "the value may be in
//! an inconsistent state after a panic" should encode that in `T` itself
//! (e.g. `Mutex<Option<State>>` and `take()` the value at the start of
//! each critical section).
//!
//! Reentrance
//! ==========
//! Not reentrant. An actor that already holds the lock and calls `lock()`
//! again on the same mutex will wait on its own grant — and time out. This
//! is a bug in the caller, not a feature.
//!
//! Multi-threading note
//! ====================
//! The current implementation uses `Rc<RefCell<…>>` internals because the
//! v0.2 scheduler is single-threaded. The public API is identical to what
//! the eventual multi-threaded version will expose; the migration replaces
//! the `Rc<RefCell>` with `Arc<sync::Mutex>` around bookkeeping (waiters
//! queue, holder pid) — the *value* itself never goes through a blocking
//! OS-level lock, because contention always parks the green thread first.
//! No `unsafe impl Send` games today: `loom::Mutex<T>` is `!Send` on v0.2,
//! which is correct given there is only one OS thread.
use crate::pid::Pid;
use crate::scheduler;
use crate::timer::{self, TimerTarget};
use std::cell::{Cell, RefCell};
use std::collections::VecDeque;
use std::rc::Rc;
use std::time::Duration;
/// 30 seconds. Override per-call with `lock_timeout`, or per-mutex (TODO)
/// once the supervisor-level policy hook lands.
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct LockTimeout;
impl std::fmt::Display for LockTimeout {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "mutex lock timed out")
}
}
impl std::error::Error for LockTimeout {}
// ---------------------------------------------------------------------------
// Internals
// ---------------------------------------------------------------------------
/// A pending lock attempt. Sits in `MutexCore::state.waiters` from the
/// moment an actor parks until it is either granted the lock (popped by
/// `MutexGuard::drop`) or times out (popped by `on_timeout`).
struct Wait {
pid: Pid,
/// Per-mutex monotonic sequence. Lets `on_timeout` recognise "this
/// specific wait" vs. "a later wait by the same pid on the same
/// mutex" — important because a single actor can re-acquire and then
/// re-wait, and we don't want a stale timer firing to disturb the new
/// wait.
seq: u64,
}
/// The non-generic part of the mutex. Lives inside `Rc<>` so it can also
/// be stashed (as `Rc<dyn TimerTarget>`) inside a timer entry.
struct MutexCore {
state: RefCell<MutexState>,
default_timeout: Cell<Duration>,
}
struct MutexState {
holder: Option<Pid>,
waiters: VecDeque<Wait>,
next_seq: u64,
}
impl MutexCore {
fn new(default_timeout: Duration) -> Self {
Self {
state: RefCell::new(MutexState {
holder: None,
waiters: VecDeque::new(),
next_seq: 0,
}),
default_timeout: Cell::new(default_timeout),
}
}
}
impl TimerTarget for MutexCore {
fn on_timeout(&self, pid: Pid, wait_seq: u64) {
// Remove the waiter with this seq, if it's still queued. If it's
// gone the lock was already granted to this actor before the timer
// popped — the actor will return normally; do nothing.
let removed = {
let mut st = self.state.borrow_mut();
if let Some(pos) = st.waiters.iter().position(|w| w.seq == wait_seq) {
st.waiters.remove(pos);
true
} else {
false
}
};
if removed {
// The actor is parked, waiting on us. Wake it up; `lock_timeout`
// will resume, observe `holder != Some(self)`, and return
// LockTimeout.
scheduler::unpark(pid);
}
}
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
pub struct Mutex<T> {
core: Rc<MutexCore>,
/// `None` while the lock is held; `Some(T)` while free or while a
/// grantee is in the gap between unpark and resumption.
value: Rc<RefCell<Option<T>>>,
}
impl<T> Mutex<T> {
pub fn new(value: T) -> Self {
Self {
core: Rc::new(MutexCore::new(DEFAULT_TIMEOUT)),
value: Rc::new(RefCell::new(Some(value))),
}
}
/// Set the default timeout used by `lock()`. Does not affect in-flight
/// `lock_timeout` calls.
pub fn set_default_timeout(&self, timeout: Duration) {
self.core.default_timeout.set(timeout);
}
/// Acquire the lock, blocking the calling actor until it's granted or
/// the default timeout expires.
pub fn lock(&self) -> Result<MutexGuard<'_, T>, LockTimeout> {
self.lock_timeout(self.core.default_timeout.get())
}
/// Acquire the lock with an explicit timeout.
pub fn lock_timeout(&self, timeout: Duration) -> Result<MutexGuard<'_, T>, LockTimeout> {
let me = scheduler::self_pid();
// Fast path: nobody holds it. Mark ourselves as holder, take the
// value out, return a guard.
{
let mut st = self.core.state.borrow_mut();
if st.holder.is_none() {
st.holder = Some(me);
drop(st);
let value = self
.value
.borrow_mut()
.take()
.expect("Mutex: value missing on free fast path");
return Ok(MutexGuard {
mutex: self,
value: Some(value),
});
}
}
// Slow path: register as a waiter, schedule a timeout, park.
// No preemption during prep-to-park — see scheduler::NoPreempt.
let _np = scheduler::NoPreempt::enter();
let seq = {
let mut st = self.core.state.borrow_mut();
let seq = st.next_seq;
st.next_seq = st.next_seq.wrapping_add(1);
st.waiters.push_back(Wait { pid: me, seq });
seq
};
let target: Rc<dyn TimerTarget> = self.core.clone();
let deadline = timer::deadline_from_now(timeout);
scheduler::insert_wait_timer(deadline, me, target, seq);
scheduler::park_current();
// Resumed. Two possibilities:
// (a) MutexGuard::drop on the previous holder popped us off the
// waiters queue, set core.holder = me, and unparked us.
// => self.value is Some, we take it and return Ok.
// (b) on_timeout fired: it removed us from waiters and unparked
// us, but did NOT set holder. core.holder is whatever it was
// (Some(other) or None). => return Err.
let is_holder = self.core.state.borrow().holder == Some(me);
if is_holder {
let value = self
.value
.borrow_mut()
.take()
.expect("Mutex: value missing after grant");
Ok(MutexGuard {
mutex: self,
value: Some(value),
})
} else {
Err(LockTimeout)
}
}
/// Non-blocking attempt. Returns `Some` if the lock was free, `None`
/// otherwise. Useful as a fast path before a long-running computation,
/// or for tests.
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
let mut st = self.core.state.borrow_mut();
if st.holder.is_some() {
return None;
}
let me = scheduler::self_pid();
st.holder = Some(me);
drop(st);
let value = self
.value
.borrow_mut()
.take()
.expect("Mutex: value missing on try_lock free path");
Some(MutexGuard {
mutex: self,
value: Some(value),
})
}
}
impl<T> Clone for Mutex<T> {
/// Cloning a `Mutex<T>` clones the handle, not the protected value —
/// both clones refer to the same lock state and the same `T`.
fn clone(&self) -> Self {
Self {
core: self.core.clone(),
value: self.value.clone(),
}
}
}
// ---------------------------------------------------------------------------
// Guard
// ---------------------------------------------------------------------------
pub struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
/// The protected value, taken out of `mutex.value` while the guard is
/// alive. `Option` only so `Drop` can put it back; in normal use this
/// is always `Some` while the guard is observable.
value: Option<T>,
}
impl<T> std::ops::Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.value.as_ref().expect("MutexGuard: value missing")
}
}
impl<T> std::ops::DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.value.as_mut().expect("MutexGuard: value missing")
}
}
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
// Put the value back into the mutex.
let v = self.value.take().expect("MutexGuard: double drop");
*self.mutex.value.borrow_mut() = Some(v);
// Pick the next waiter (if any) and grant it the lock by writing
// its pid into `holder` *before* unparking. The grantee, on
// resumption, will see `holder == self_pid` and take the value.
let next_pid = {
let mut st = self.mutex.core.state.borrow_mut();
let next = st.waiters.pop_front();
match next {
Some(w) => {
st.holder = Some(w.pid);
Some(w.pid)
}
None => {
st.holder = None;
None
}
}
};
if let Some(pid) = next_pid {
scheduler::unpark(pid);
}
}
}

View File

@@ -344,16 +344,69 @@ pub fn park_current() {
unsafe { crate::context::switch_to_scheduler() }; unsafe { crate::context::switch_to_scheduler() };
} }
/// RAII guard that disables allocator-driven preemption for its lifetime.
///
/// The "prep-to-park" hazard described in `preempt.rs`: a primitive that
/// (a) registers an unparker (channel waiter slot, fd waiter map, mutex
/// waiter queue, …) and then (b) calls `park_current()` must not yield
/// between (a) and (b). If it does, an early unpark fires while the actor
/// is still Runnable, the unpark no-ops, and then the actor parks with no
/// one to wake it.
///
/// Library code wraps the prep + park in `let _g = NoPreempt::enter();`
/// and the guard is held until just after `park_current` returns (or
/// dropped earlier, immediately before `park_current`, since `park_current`
/// itself returns control to the scheduler which disables preemption on
/// its own path).
pub struct NoPreempt(bool);
impl NoPreempt {
pub fn enter() -> Self {
let prev = crate::preempt::PREEMPTION_ENABLED.with(|c| c.replace(false));
NoPreempt(prev)
}
}
impl Drop for NoPreempt {
fn drop(&mut self) {
crate::preempt::PREEMPTION_ENABLED.with(|c| c.set(self.0));
}
}
/// Park the current actor for at least `duration`. A zero duration behaves /// Park the current actor for at least `duration`. A zero duration behaves
/// like `yield_now` (the deadline is immediately in the past, so the timer /// like `yield_now` (the deadline is immediately in the past, so the timer
/// pops on the next scheduler iteration). /// pops on the next scheduler iteration).
pub fn sleep(duration: std::time::Duration) { pub fn sleep(duration: std::time::Duration) {
let me = current_pid().expect("sleep() called outside an actor"); let me = current_pid().expect("sleep() called outside an actor");
let _np = NoPreempt::enter();
let deadline = crate::timer::deadline_from_now(duration); let deadline = crate::timer::deadline_from_now(duration);
with_sched(|s| s.timers.insert(deadline, me)); with_sched(|s| s.timers.insert_sleep(deadline, me));
park_current(); park_current();
} }
/// Insert a `WaitTimeout` timer entry. Library code (`Mutex::lock_timeout`
/// today, future bounded-wait primitives) calls this just before
/// `park_current()` so that if the wait isn't satisfied by `deadline`,
/// `target.on_timeout(pid, wait_seq)` will fire.
///
/// Cancellation: not needed. If the wait is satisfied early, the entry is
/// still in the heap and will pop in due course; `on_timeout` is expected
/// to be idempotent on stale-seq.
pub fn insert_wait_timer(
deadline: std::time::Instant,
pid: Pid,
target: std::rc::Rc<dyn crate::timer::TimerTarget>,
wait_seq: u64,
) {
with_sched(|s| {
s.timers.insert(
deadline,
pid,
crate::timer::Reason::WaitTimeout { target, wait_seq },
);
});
}
/// Run `f` on the IO worker thread, park the current actor while it runs, /// Run `f` on the IO worker thread, park the current actor while it runs,
/// and return `f`'s value when it completes. Panics inside `f` propagate /// and return `f`'s value when it completes. Panics inside `f` propagate
/// to the calling actor. /// to the calling actor.
@@ -377,12 +430,14 @@ where
Ok(Box::new(v) as Box<dyn std::any::Any + Send>) Ok(Box::new(v) as Box<dyn std::any::Any + Send>)
}); });
{
let _np = NoPreempt::enter();
with_sched(|s| { with_sched(|s| {
let io = s.io.as_mut().expect("io thread not started"); let io = s.io.as_mut().expect("io thread not started");
io.submit(me, work); io.submit(me, work);
}); });
park_current(); park_current();
}
// On resume, our slot has a result waiting. // On resume, our slot has a result waiting.
let result = with_sched(|s| { let result = with_sched(|s| {
@@ -401,6 +456,83 @@ where
} }
} }
// ---------------------------------------------------------------------------
// Fd-readiness primitives.
//
// `wait_readable(fd)` / `wait_writable(fd)` register interest with the
// epoll thread, park the calling actor, and return when the kernel
// signals readiness. The subsequent syscall (`read`/`write`) is done on
// the actor's own thread by the caller — no buffer crosses an actor
// boundary.
//
// Fds passed in should be O_NONBLOCK; see io.rs module docs.
// ---------------------------------------------------------------------------
/// Park the calling actor until `fd` is readable.
pub fn wait_readable(fd: std::os::fd::RawFd) -> std::io::Result<()> {
wait_fd(fd, /*readable=*/ true, /*writable=*/ false)
}
/// Park the calling actor until `fd` is writable.
pub fn wait_writable(fd: std::os::fd::RawFd) -> std::io::Result<()> {
wait_fd(fd, /*readable=*/ false, /*writable=*/ true)
}
fn wait_fd(fd: std::os::fd::RawFd, readable: bool, writable: bool) -> std::io::Result<()> {
let me = current_pid().expect("wait_*() called outside an actor");
// Register with the epoll thread. If registration fails (bad fd,
// already-parked waiter, OOM in the kernel), return the error
// without parking — the actor never went to sleep.
let _np = NoPreempt::enter();
with_sched(|s| {
let io = s.io.as_mut().expect("io thread not started");
io.epoll_register(fd, me, readable, writable)
})?;
park_current();
// On resume, the scheduler has already removed `fd` from `waiters`
// and DEL'd it from epollfd. There is no per-call return value;
// success here just means "fd is ready, go do your syscall".
//
// Note: there is no error path on resume because v0.2 doesn't time
// out fd waits and doesn't otherwise spurious-wake. If those are
// added, this function grows a non-trivial return.
Ok(())
}
/// Wait until `fd` is readable, then run a single `read(2)`. Returns the
/// number of bytes read, or an `io::Error` from the syscall.
///
/// `fd` should be opened `O_NONBLOCK`. With a blocking fd, the kernel's
/// readiness signal does not guarantee a non-blocking read — a signal
/// could interrupt, and the actor's syscall would then stall the
/// scheduler thread.
pub fn read(fd: std::os::fd::RawFd, buf: &mut [u8]) -> std::io::Result<usize> {
wait_readable(fd)?;
let n = unsafe {
libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len())
};
if n < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(n as usize)
}
}
/// Wait until `fd` is writable, then run a single `write(2)`.
pub fn write(fd: std::os::fd::RawFd, buf: &[u8]) -> std::io::Result<usize> {
wait_writable(fd)?;
let n = unsafe {
libc::write(fd, buf.as_ptr() as *const _, buf.len())
};
if n < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(n as usize)
}
}
/// Wake a parked actor. If the actor isn't parked (already runnable or done) /// Wake a parked actor. If the actor isn't parked (already runnable or done)
/// this is a no-op — that's important; channel and join can both fire /// this is a no-op — that's important; channel and join can both fire
/// spurious unparks under some orderings and we want them to be cheap. /// spurious unparks under some orderings and we want them to be cheap.
@@ -488,29 +620,55 @@ pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX);
fn schedule_loop() { fn schedule_loop() {
loop { loop {
// 1. Drain due timers into the run queue. // 1. Drain due timers and dispatch by reason.
//
// Sleep — unpark the actor (idempotently: only if still
// parked).
// WaitTimeout — call the target's on_timeout. The target decides
// whether the wait was still in progress (timer
// won the race) or had been fulfilled (the thing
// the actor was waiting for arrived first → no-op).
// The target is responsible for calling unpark()
// if appropriate.
let now = std::time::Instant::now(); let now = std::time::Instant::now();
let due = with_sched(|s| s.timers.pop_due(now)); let due = with_sched(|s| s.timers.pop_due(now));
for pid in due { for entry in due {
// Same idempotency as `unpark`: only re-queue if still parked. match entry.reason {
crate::timer::Reason::Sleep => {
with_sched(|s| { with_sched(|s| {
if let Some(slot) = s.slot_mut(pid) { if let Some(slot) = s.slot_mut(entry.pid) {
if matches!(slot.state, State::Parked) { if matches!(slot.state, State::Parked) {
slot.state = State::Runnable; slot.state = State::Runnable;
s.run_queue.push_back(pid); s.run_queue.push_back(entry.pid);
} }
} }
}); });
} }
crate::timer::Reason::WaitTimeout { target, wait_seq } => {
// Note: the target callback runs *outside* with_sched.
// It may call back into the scheduler (e.g. unpark), so
// we must not hold the SCHED borrow across it.
target.on_timeout(entry.pid, wait_seq);
}
}
}
// 2. Drain IO completions: route each result to its slot and // 2. Drain IO completions: route each result by variant.
// unpark the actor. Drain even when we have other runnables — //
// it's cheap (a try_lock of the completion queue) and keeps // Blocking — a `block_on_io` closure finished. Stash the result
// pending_io_result freshness bounded. // on the actor's slot and unpark.
// FdReady — an fd registered via `wait_readable`/`wait_writable`
// is ready. Look up the parked pid in the io thread's
// waiters map, deregister the fd, unpark.
//
// Drain even when we have other runnables — it's cheap and keeps
// `pending_io_result` / `waiters` freshness bounded.
let completions = with_sched(|s| { let completions = with_sched(|s| {
s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default() s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default()
}); });
for (pid, result) in completions { for completion in completions {
match completion {
crate::io::Completion::Blocking { pid, result } => {
with_sched(|s| { with_sched(|s| {
if let Some(io) = s.io.as_mut() { if let Some(io) = s.io.as_mut() {
io.outstanding = io.outstanding.saturating_sub(1); io.outstanding = io.outstanding.saturating_sub(1);
@@ -524,6 +682,34 @@ fn schedule_loop() {
} }
}); });
} }
crate::io::Completion::FdReady { fd, events: _ } => {
with_sched(|s| {
let parked_pid = s.io.as_mut()
.and_then(|io| {
let pid = io.waiters.remove(&fd);
// Deregister the fd from epollfd; the
// EPOLLONESHOT already disarmed it but the
// slot is still occupied until we DEL.
io.epoll_deregister(fd);
pid
});
if let Some(pid) = parked_pid {
if let Some(slot) = s.slot_mut(pid) {
if matches!(slot.state, State::Parked) {
slot.state = State::Runnable;
s.run_queue.push_back(pid);
}
}
// else: actor died between registering and the
// fd firing. Nothing to do; the registration
// has been cleaned up.
}
// else: fd not in waiters — probably a duplicate
// FdReady from a previous registration, ignore.
});
}
}
}
// 3. Pop a runnable actor. If none, decide whether to block on // 3. Pop a runnable actor. If none, decide whether to block on
// the wake pipe (for timers or IO) or exit (nothing pending). // the wake pipe (for timers or IO) or exit (nothing pending).
@@ -536,10 +722,18 @@ fn schedule_loop() {
// trying to take the completions mutex, which is fine, // trying to take the completions mutex, which is fine,
// but the scheduler thread itself mustn't hold SCHED // but the scheduler thread itself mustn't hold SCHED
// borrowed across a blocking syscall. // borrowed across a blocking syscall.
//
// "Outstanding" here means *anything* the IO thread is
// expected to deliver a wakeup for: in-flight blocking
// calls AND parked fd waiters. If either is non-zero we
// must wait for the IO thread, not exit.
let (next_deadline, io_outstanding, wake_fd) = with_sched(|s| { let (next_deadline, io_outstanding, wake_fd) = with_sched(|s| {
let next = s.timers.peek_deadline(); let next = s.timers.peek_deadline();
let (out, fd) = match s.io.as_ref() { let (out, fd) = match s.io.as_ref() {
Some(io) => (io.outstanding, Some(io.wake_fd())), Some(io) => (
io.outstanding + io.waiters.len() as u32,
Some(io.wake_fd()),
),
None => (0, None), None => (0, None),
}; };
(next, out, fd) (next, out, fd)

View File

@@ -1,38 +1,86 @@
//! Sleep timers. //! Sleep + wait-with-timeout timers.
//! //!
//! A min-heap of `(deadline, Pid)` entries lives on `SchedulerState`. When //! A min-heap of `(deadline, seq, reason)` entries lives on `SchedulerState`.
//! an actor calls `sleep`, the runtime inserts the entry, marks the actor //! When an actor sleeps or starts a bounded wait (e.g. `mutex.lock()` with a
//! parked, and yields. On every scheduler loop iteration the runtime pops //! timeout), the runtime inserts an entry, marks the actor parked, and yields.
//! all entries whose deadline has passed and unparks them. When the run //! On every scheduler loop iteration the runtime pops all entries whose
//! queue is empty but the heap is not, the runtime sleeps the OS thread //! deadline has passed and dispatches each according to its `Reason`:
//! until the soonest deadline, then re-checks.
//! //!
//! `BinaryHeap` is a max-heap, so entries are stored with their deadline //! - `Sleep`: unpark the actor.
//! wrapped in `Reverse` to get min-heap behaviour. //! - `WaitTimeout`: call `on_timeout` on the registered target. The target
//! (e.g. a `Mutex`) decides whether the actor was actually still waiting
//! (timer fires first → unpark with error) or had already been granted
//! what it was waiting for (lock granted first → no-op).
//! //!
//! Stale pids (slot reused since the timer was inserted) are detected on //! `BinaryHeap` is a max-heap; entries are wrapped in `Reverse` to get
//! `due_pids` pop and silently dropped — same convention as the run queue. //! min-heap behaviour.
//!
//! No cancellation. When a non-timer wakeup happens (e.g. lock granted
//! before timeout), the timer entry is left in the heap. It will be popped
//! eventually and the dispatch will observe "actor is no longer parked /
//! wait_seq is stale" and no-op. Cost is ~32 bytes per stale entry plus a
//! few cycles on pop; acceptable given the upper bound is "one entry per
//! parked actor".
//!
//! Stale pids (slot reused since the timer was inserted) are filtered on
//! pop by the scheduler — same convention as the run queue.
use crate::pid::Pid; use crate::pid::Pid;
use std::cmp::Reverse; use std::cmp::Reverse;
use std::collections::BinaryHeap; use std::collections::BinaryHeap;
use std::rc::Rc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
#[derive(PartialEq, Eq)] /// What to do when a timer entry's deadline arrives.
///
/// Held inside `Entry`, dispatched by the scheduler in `pop_due`.
pub enum Reason {
/// `loom::sleep(d)`. Unpark `pid` unconditionally (modulo the usual
/// "still parked?" check the scheduler applies).
Sleep,
/// A bounded wait — currently only `Mutex::lock_timeout`. On expiry the
/// scheduler calls `target.on_timeout(pid, wait_seq)`. The target then
/// decides whether `pid` was actually still waiting, and if so unparks
/// it with whatever error the wait was bounded for. `wait_seq` lets the
/// target tell apart "this wait" from "a later wait by the same actor
/// on the same target".
WaitTimeout {
target: Rc<dyn TimerTarget>,
wait_seq: u64,
},
}
/// Callback the scheduler invokes when a `WaitTimeout` entry pops.
///
/// Implementors: do not touch `SchedulerState` other than via the public
/// `unpark` / channel APIs. The scheduler is mid-iteration when this fires.
pub trait TimerTarget {
fn on_timeout(&self, pid: Pid, wait_seq: u64);
}
pub struct Entry { pub struct Entry {
pub deadline: Instant, pub deadline: Instant,
/// Insertion order, used purely as a tiebreaker so `Entry: Ord` works
/// without having to compare the `Reason` payload (which contains an
/// `Rc<dyn TimerTarget>` and isn't `Ord`).
seq: u64,
pub pid: Pid, pub pid: Pid,
pub reason: Reason,
} }
impl PartialEq for Entry {
fn eq(&self, other: &Self) -> bool {
self.deadline == other.deadline && self.seq == other.seq
}
}
impl Eq for Entry {}
impl Ord for Entry { impl Ord for Entry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering { fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Only `deadline` matters for ordering; pid is a tiebreaker so the // Earlier deadline first; ties broken by insertion order so the
// type is Ord, but the order among same-deadline entries is // ordering is total. `Reason` and `Pid` deliberately don't
// irrelevant. // participate.
self.deadline self.deadline.cmp(&other.deadline).then_with(|| self.seq.cmp(&other.seq))
.cmp(&other.deadline)
.then_with(|| self.pid.index().cmp(&other.pid.index()))
.then_with(|| self.pid.generation().cmp(&other.pid.generation()))
} }
} }
@@ -46,15 +94,25 @@ impl PartialOrd for Entry {
pub struct Timers { pub struct Timers {
/// Reverse-wrapped so the smallest deadline is at the top. /// Reverse-wrapped so the smallest deadline is at the top.
heap: BinaryHeap<Reverse<Entry>>, heap: BinaryHeap<Reverse<Entry>>,
/// Monotonic counter for the tiebreaker `seq` field.
next_seq: u64,
} }
impl Timers { impl Timers {
pub fn new() -> Self { pub fn new() -> Self {
Self { heap: BinaryHeap::new() } Self { heap: BinaryHeap::new(), next_seq: 0 }
} }
pub fn insert(&mut self, deadline: Instant, pid: Pid) { /// Insert a `Sleep` timer. Convenience for the common case.
self.heap.push(Reverse(Entry { deadline, pid })); pub fn insert_sleep(&mut self, deadline: Instant, pid: Pid) {
self.insert(deadline, pid, Reason::Sleep);
}
/// Insert an arbitrary timer entry.
pub fn insert(&mut self, deadline: Instant, pid: Pid, reason: Reason) {
let seq = self.next_seq;
self.next_seq = self.next_seq.wrapping_add(1);
self.heap.push(Reverse(Entry { deadline, seq, pid, reason }));
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
@@ -66,13 +124,13 @@ impl Timers {
self.heap.peek().map(|r| r.0.deadline) self.heap.peek().map(|r| r.0.deadline)
} }
/// Pop and return every pid whose deadline is ≤ `now`. /// Pop every entry whose deadline is ≤ `now`, in deadline order.
pub fn pop_due(&mut self, now: Instant) -> Vec<Pid> { /// The scheduler dispatches each entry by inspecting `entry.reason`.
pub fn pop_due(&mut self, now: Instant) -> Vec<Entry> {
let mut out = Vec::new(); let mut out = Vec::new();
while let Some(r) = self.heap.peek() { while let Some(r) = self.heap.peek() {
if r.0.deadline <= now { if r.0.deadline <= now {
let e = self.heap.pop().unwrap().0; out.push(self.heap.pop().unwrap().0);
out.push(e.pid);
} else { } else {
break; break;
} }
@@ -81,7 +139,7 @@ impl Timers {
} }
} }
/// Wall-clock duration helper exposed for `sleep`. /// Wall-clock duration helper exposed for `sleep` and `lock_timeout`.
pub fn deadline_from_now(duration: Duration) -> Instant { pub fn deadline_from_now(duration: Duration) -> Instant {
Instant::now() Instant::now()
.checked_add(duration) .checked_add(duration)

324
tests/io_epoll.rs Normal file
View File

@@ -0,0 +1,324 @@
//! Tests for epoll-based fd readiness primitives: `wait_readable`,
//! `wait_writable`, and the `read`/`write` sugar on top of them.
//!
//! Pipes are the convenient test target: cheap to create, easy to drive,
//! and we already use `libc::pipe2` internally. Each pipe is one direction
//! and respects `O_NONBLOCK` if we ask for it.
use smarm::{run, spawn, wait_readable, wait_writable, yield_now};
use std::os::fd::RawFd;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
// ---------------------------------------------------------------------------
// Pipe helper
// ---------------------------------------------------------------------------
struct Pipe {
read: RawFd,
write: RawFd,
}
impl Pipe {
fn new() -> Self {
let mut fds: [libc::c_int; 2] = [0; 2];
let r = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) };
assert_eq!(r, 0, "pipe2 failed");
Pipe {
read: fds[0],
write: fds[1],
}
}
}
impl Drop for Pipe {
fn drop(&mut self) {
unsafe {
libc::close(self.read);
libc::close(self.write);
}
}
}
fn raw_write(fd: RawFd, buf: &[u8]) -> isize {
unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) }
}
fn raw_read(fd: RawFd, buf: &mut [u8]) -> isize {
unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) }
}
// ---------------------------------------------------------------------------
// wait_readable parks until data arrives, then libc::read succeeds.
// ---------------------------------------------------------------------------
#[test]
fn wait_readable_blocks_until_data_arrives_then_read_succeeds() {
let captured: Arc<StdMutex<Vec<u8>>> = Arc::new(StdMutex::new(Vec::new()));
let cap = captured.clone();
let p = Arc::new(Pipe::new());
let p_reader = p.clone();
let p_writer = p.clone();
run(move || {
let reader = spawn(move || {
// Initially the pipe is empty; this parks.
wait_readable(p_reader.read).expect("wait_readable failed");
// Now data should be readable.
let mut buf = [0u8; 16];
let n = raw_read(p_reader.read, &mut buf);
assert!(n > 0, "read returned {}", n);
cap.lock().unwrap().extend_from_slice(&buf[..n as usize]);
});
let writer = spawn(move || {
// Yield so the reader gets to park first.
yield_now();
yield_now();
// Sleep a touch so the reader is definitely waiting in epoll.
smarm::sleep(Duration::from_millis(5));
let n = raw_write(p_writer.write, b"hello");
assert_eq!(n, 5);
});
reader.join().unwrap();
writer.join().unwrap();
});
assert_eq!(*captured.lock().unwrap(), b"hello");
}
// ---------------------------------------------------------------------------
// The smarm::scheduler::read sugar — wait_readable + libc::read in one call.
// ---------------------------------------------------------------------------
#[test]
fn read_sugar_returns_bytes_from_pipe() {
let captured: Arc<StdMutex<Vec<u8>>> = Arc::new(StdMutex::new(Vec::new()));
let cap = captured.clone();
let p = Arc::new(Pipe::new());
let p_reader = p.clone();
let p_writer = p.clone();
run(move || {
let reader = spawn(move || {
let mut buf = [0u8; 16];
let n = smarm::scheduler::read(p_reader.read, &mut buf)
.expect("smarm::scheduler::read failed");
cap.lock().unwrap().extend_from_slice(&buf[..n]);
});
let writer = spawn(move || {
yield_now();
smarm::sleep(Duration::from_millis(5));
let _ = raw_write(p_writer.write, b"world");
});
reader.join().unwrap();
writer.join().unwrap();
});
assert_eq!(*captured.lock().unwrap(), b"world");
}
// ---------------------------------------------------------------------------
// wait_writable + write — though pipes are almost always writable; the
// useful test here is that the call doesn't hang on a writable fd.
// ---------------------------------------------------------------------------
#[test]
fn write_sugar_sends_bytes_to_pipe() {
let counter = Arc::new(AtomicU32::new(0));
let c = counter.clone();
let p = Arc::new(Pipe::new());
let p_writer = p.clone();
let p_reader = p.clone();
run(move || {
let writer = spawn(move || {
// Pipe is empty + has buffer space, so this returns immediately
// after wait_writable wakes (which happens fast because the
// kernel marks an empty pipe as immediately writable).
let n = smarm::scheduler::write(p_writer.write, b"smarm")
.expect("write failed");
assert_eq!(n, 5);
c.fetch_add(1, Ordering::SeqCst);
});
let reader = spawn(move || {
// Give the writer time.
smarm::sleep(Duration::from_millis(10));
let mut buf = [0u8; 16];
let n = raw_read(p_reader.read, &mut buf);
assert_eq!(n, 5);
assert_eq!(&buf[..5], b"smarm");
});
writer.join().unwrap();
reader.join().unwrap();
});
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
// ---------------------------------------------------------------------------
// While an actor is parked on wait_readable, other actors keep running.
// ---------------------------------------------------------------------------
#[test]
fn other_actors_run_while_one_is_parked_on_wait_readable() {
let log: Arc<StdMutex<Vec<u8>>> = Arc::new(StdMutex::new(Vec::new()));
let la = log.clone();
let lb = log.clone();
let p = Arc::new(Pipe::new());
let p_a = p.clone();
let p_b = p.clone();
run(move || {
let a = spawn(move || {
la.lock().unwrap().push(b'A');
wait_readable(p_a.read).unwrap();
la.lock().unwrap().push(b'a');
});
let b = spawn(move || {
// A starts parking on the empty pipe; B should be free to do
// its work in the meantime.
for _ in 0..3 {
yield_now();
lb.lock().unwrap().push(b'B');
}
// Now wake A.
let _ = raw_write(p_b.write, b"x");
});
a.join().unwrap();
b.join().unwrap();
});
let v = log.lock().unwrap();
// A goes first ('A'), then B makes progress (multiple 'B's) while A is
// parked, then A wakes and finishes ('a').
let pos_big_a = v.iter().position(|&c| c == b'A').unwrap();
let pos_lit_a = v.iter().position(|&c| c == b'a').unwrap();
let big_b_count = v.iter().filter(|&&c| c == b'B').count();
assert_eq!(big_b_count, 3, "B should have made 3 steps: {:?}", *v);
assert!(pos_big_a < pos_lit_a, "A pre-park before A post-park: {:?}", *v);
// At least the last B step should be before A resumes.
let last_big_b = v.iter().rposition(|&c| c == b'B').unwrap();
assert!(last_big_b < pos_lit_a, "B should finish before A resumes: {:?}", *v);
}
// ---------------------------------------------------------------------------
// Two-way pipe ping-pong via wait_readable.
// ---------------------------------------------------------------------------
#[test]
fn ping_pong_between_two_pipes_completes() {
// a_to_b: actor A writes, actor B reads.
// b_to_a: actor B writes, actor A reads.
let a_to_b = Arc::new(Pipe::new());
let b_to_a = Arc::new(Pipe::new());
let counter = Arc::new(AtomicU32::new(0));
let ca = counter.clone();
let cb = counter.clone();
let a_to_b_a = a_to_b.clone();
let a_to_b_b = a_to_b.clone();
let b_to_a_a = b_to_a.clone();
let b_to_a_b = b_to_a.clone();
run(move || {
let a = spawn(move || {
for _ in 0..5 {
let _ = raw_write(a_to_b_a.write, b"x");
wait_readable(b_to_a_a.read).unwrap();
let mut buf = [0u8; 4];
let _ = raw_read(b_to_a_a.read, &mut buf);
ca.fetch_add(1, Ordering::SeqCst);
}
});
let b = spawn(move || {
for _ in 0..5 {
wait_readable(a_to_b_b.read).unwrap();
let mut buf = [0u8; 4];
let _ = raw_read(a_to_b_b.read, &mut buf);
let _ = raw_write(b_to_a_b.write, b"y");
cb.fetch_add(1, Ordering::SeqCst);
}
});
a.join().unwrap();
b.join().unwrap();
});
// Both sides did 5 rounds; counter is incremented by both, so total = 10.
assert_eq!(counter.load(Ordering::SeqCst), 10);
}
// ---------------------------------------------------------------------------
// Same fd reused across calls — DEL+ADD cycle works.
// ---------------------------------------------------------------------------
#[test]
fn same_fd_can_be_waited_on_repeatedly() {
let p = Arc::new(Pipe::new());
let p_r = p.clone();
let p_w = p.clone();
let counter = Arc::new(AtomicU32::new(0));
let c = counter.clone();
run(move || {
let reader = spawn(move || {
for _ in 0..4 {
wait_readable(p_r.read).unwrap();
let mut buf = [0u8; 4];
let n = raw_read(p_r.read, &mut buf);
assert!(n > 0);
c.fetch_add(1, Ordering::SeqCst);
}
});
let writer = spawn(move || {
for _ in 0..4 {
yield_now();
smarm::sleep(Duration::from_millis(2));
let _ = raw_write(p_w.write, b"z");
}
});
reader.join().unwrap();
writer.join().unwrap();
});
assert_eq!(counter.load(Ordering::SeqCst), 4);
}
// ---------------------------------------------------------------------------
// Sanity that wait_writable on an already-writable pipe returns promptly.
// ---------------------------------------------------------------------------
#[test]
fn wait_writable_on_empty_pipe_returns_quickly() {
let p = Arc::new(Pipe::new());
let p_w = p.clone();
let start = std::time::Instant::now();
run(move || {
wait_writable(p_w.write).unwrap();
});
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(200),
"wait_writable should be fast on a writable fd, took {:?}",
elapsed
);
}

311
tests/mutex.rs Normal file
View File

@@ -0,0 +1,311 @@
//! `loom::Mutex<T>` tests. All run under the scheduler because `lock()`
//! needs to be able to park.
use smarm::{run, spawn, yield_now, LockTimeout, Mutex};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
// ---------------------------------------------------------------------------
// Uncontended fast path
// ---------------------------------------------------------------------------
#[test]
fn lock_free_mutex_succeeds() {
let captured = Arc::new(AtomicU32::new(0));
let c = captured.clone();
run(move || {
let m = Mutex::new(42u32);
{
let g = m.lock().unwrap();
c.store(*g, Ordering::SeqCst);
}
// After drop we can lock again.
let g2 = m.lock().unwrap();
assert_eq!(*g2, 42);
});
assert_eq!(captured.load(Ordering::SeqCst), 42);
}
#[test]
fn try_lock_returns_some_when_free_none_when_held() {
let success_flag = Arc::new(AtomicU32::new(0));
let s = success_flag.clone();
run(move || {
let m = Mutex::new(0u32);
let g = m.try_lock().expect("free");
// Holding the guard; a second try_lock on the same actor should fail.
assert!(m.try_lock().is_none());
drop(g);
// Now free again.
let g2 = m.try_lock().expect("free again");
drop(g2);
s.store(1, Ordering::SeqCst);
});
assert_eq!(success_flag.load(Ordering::SeqCst), 1);
}
#[test]
fn guard_mutates_value_visible_through_next_lock() {
let final_value = Arc::new(AtomicU32::new(0));
let f = final_value.clone();
run(move || {
let m = Mutex::new(0u32);
{
let mut g = m.lock().unwrap();
*g = 7;
}
let g2 = m.lock().unwrap();
f.store(*g2, Ordering::SeqCst);
});
assert_eq!(final_value.load(Ordering::SeqCst), 7);
}
// ---------------------------------------------------------------------------
// Contention: a second actor parks until the first releases.
// ---------------------------------------------------------------------------
#[test]
fn contended_lock_parks_until_holder_releases() {
// Actor A locks, yields (still holding), then releases. Actor B tries
// to lock in between — B should park, then succeed after A drops.
let log: Arc<StdMutex<Vec<&'static str>>> = Arc::new(StdMutex::new(Vec::new()));
let la = log.clone();
let lb = log.clone();
run(move || {
let m = Mutex::new(0u32);
let m_a = m.clone();
let m_b = m.clone();
let a = spawn(move || {
let g = m_a.lock().unwrap();
la.lock().unwrap().push("A_locked");
// While holding, yield to let B run.
yield_now();
la.lock().unwrap().push("A_dropping");
drop(g);
la.lock().unwrap().push("A_dropped");
});
let b = spawn(move || {
// Wait a moment to make sure A locks first.
yield_now();
lb.lock().unwrap().push("B_try");
let _g = m_b.lock().unwrap();
lb.lock().unwrap().push("B_locked");
});
a.join().unwrap();
b.join().unwrap();
});
let v = log.lock().unwrap();
// A locks, B tries (parks), A drops, B gets the lock.
let pos_a_locked = v.iter().position(|s| *s == "A_locked").unwrap();
let pos_b_try = v.iter().position(|s| *s == "B_try").unwrap();
let pos_a_dropped = v.iter().position(|s| *s == "A_dropped").unwrap();
let pos_b_locked = v.iter().position(|s| *s == "B_locked").unwrap();
assert!(pos_a_locked < pos_b_try, "log: {:?}", *v);
assert!(pos_b_try < pos_a_dropped, "B should attempt before A drops: {:?}", *v);
assert!(pos_a_dropped < pos_b_locked, "B should lock only after A drops: {:?}", *v);
}
// ---------------------------------------------------------------------------
// Timeout: B times out while A holds forever.
// ---------------------------------------------------------------------------
#[test]
fn lock_timeout_returns_err_when_holder_never_releases() {
let saw_err = Arc::new(std::sync::atomic::AtomicBool::new(false));
let s = saw_err.clone();
run(move || {
let m: Mutex<u32> = Mutex::new(0);
let m_a = m.clone();
let m_b = m.clone();
let a = spawn(move || {
// Hold the lock for 100ms, blocking B's attempt with a 20ms timeout.
let _g = m_a.lock().unwrap();
smarm::sleep(Duration::from_millis(100));
// _g drops here.
});
let b = spawn(move || {
// Let A acquire first.
yield_now();
let t0 = Instant::now();
let res = m_b.lock_timeout(Duration::from_millis(20));
let elapsed = t0.elapsed();
assert!(matches!(res, Err(LockTimeout)), "got {:?}", res);
// Sanity: actually waited approximately the timeout.
assert!(
elapsed >= Duration::from_millis(15),
"timed out too fast: {:?}",
elapsed
);
assert!(
elapsed < Duration::from_millis(80),
"timed out far too slow: {:?}",
elapsed
);
s.store(true, Ordering::SeqCst);
});
a.join().unwrap();
b.join().unwrap();
});
assert!(saw_err.load(Ordering::SeqCst));
}
// ---------------------------------------------------------------------------
// FIFO fairness: when many actors queue, they get the lock in arrival order.
// ---------------------------------------------------------------------------
#[test]
fn waiters_are_granted_the_lock_in_fifo_order() {
let order: Arc<StdMutex<Vec<u32>>> = Arc::new(StdMutex::new(Vec::new()));
run({
let order = order.clone();
move || {
let m: Mutex<()> = Mutex::new(());
// Holder: takes the lock, yields to let others queue up, then
// releases. Each waiter records its arrival order on acquisition.
let m_holder = m.clone();
let holder = spawn(move || {
let g = m_holder.lock().unwrap();
// Let waiters pile up.
for _ in 0..5 {
yield_now();
}
drop(g);
});
// Spawn 4 waiters in order 1, 2, 3, 4. Each yields once before
// calling lock(), so we know the holder ran first.
let mut handles = vec![holder];
for id in 1u32..=4 {
let m_w = m.clone();
let o = order.clone();
handles.push(spawn(move || {
// Stagger the lock attempts so they arrive in order.
for _ in 0..id {
yield_now();
}
let _g = m_w.lock().unwrap();
o.lock().unwrap().push(id);
}));
}
for h in handles {
h.join().unwrap();
}
}
});
let v = order.lock().unwrap().clone();
assert_eq!(v, vec![1, 2, 3, 4], "waiters should acquire in arrival order");
}
// ---------------------------------------------------------------------------
// Grant-vs-timeout race: holder drops just before timer would fire — waiter
// should get the lock, not LockTimeout.
// ---------------------------------------------------------------------------
#[test]
fn grant_wins_when_holder_releases_before_timeout() {
let got_lock = Arc::new(std::sync::atomic::AtomicBool::new(false));
let g = got_lock.clone();
run(move || {
let m: Mutex<u32> = Mutex::new(0);
let m_a = m.clone();
let m_b = m.clone();
let a = spawn(move || {
let _g = m_a.lock().unwrap();
// Hold for 10ms, well under B's 100ms timeout.
smarm::sleep(Duration::from_millis(10));
});
let b = spawn(move || {
yield_now();
let res = m_b.lock_timeout(Duration::from_millis(100));
if res.is_ok() {
g.store(true, Ordering::SeqCst);
}
});
a.join().unwrap();
b.join().unwrap();
});
assert!(got_lock.load(Ordering::SeqCst));
}
// ---------------------------------------------------------------------------
// Panic in critical section: next waiter still gets the lock (no poisoning).
// ---------------------------------------------------------------------------
#[test]
fn next_waiter_gets_lock_after_holder_panics() {
let next_got_it = Arc::new(std::sync::atomic::AtomicBool::new(false));
let n = next_got_it.clone();
run(move || {
let m: Mutex<u32> = Mutex::new(7);
let m_a = m.clone();
let m_b = m.clone();
let a = spawn(move || {
let _g = m_a.lock().unwrap();
yield_now();
panic!("holder dies mid-critical-section");
});
let b = spawn(move || {
yield_now();
// A is dead but its guard's Drop ran during unwind. We get the lock.
let g = m_b.lock_timeout(Duration::from_millis(100)).unwrap();
assert_eq!(*g, 7);
n.store(true, Ordering::SeqCst);
});
let _ = a.join(); // panic — expected
b.join().unwrap();
});
assert!(next_got_it.load(Ordering::SeqCst));
}
// ---------------------------------------------------------------------------
// Multiple short critical sections under contention all complete (no lost
// wakeups, no deadlock). Counts up to N from M actors.
// ---------------------------------------------------------------------------
#[test]
fn many_actors_increment_shared_counter_via_mutex() {
const ACTORS: u32 = 8;
const PER_ACTOR: u32 = 50;
let final_value = Arc::new(AtomicU32::new(0));
let fv = final_value.clone();
run(move || {
let m: Mutex<u32> = Mutex::new(0);
let mut handles = Vec::new();
for _ in 0..ACTORS {
let m_i = m.clone();
handles.push(spawn(move || {
for _ in 0..PER_ACTOR {
let mut g = m_i.lock().unwrap();
*g += 1;
}
}));
}
for h in handles {
h.join().unwrap();
}
let g = m.lock().unwrap();
fv.store(*g, Ordering::SeqCst);
});
assert_eq!(final_value.load(Ordering::SeqCst), ACTORS * PER_ACTOR);
}

View File

@@ -114,3 +114,96 @@ fn many_concurrent_sleepers_all_wake() {
}); });
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 20); assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 20);
} }
// ---------------------------------------------------------------------------
// Direct tests on the Timers data structure. No scheduler involved — these
// cover the new Reason machinery without needing a Mutex implementation.
// ---------------------------------------------------------------------------
use smarm::pid::Pid;
use smarm::timer::{Reason, TimerTarget, Timers};
use std::cell::RefCell;
use std::rc::Rc;
struct RecordingTarget {
calls: RefCell<Vec<(Pid, u64)>>,
}
impl TimerTarget for RecordingTarget {
fn on_timeout(&self, pid: Pid, seq: u64) {
self.calls.borrow_mut().push((pid, seq));
}
}
#[test]
fn timers_pop_due_returns_entries_in_deadline_order() {
let mut t = Timers::new();
let now = Instant::now();
// Insert out of order; pop_due should hand them back sorted by deadline.
t.insert_sleep(now + Duration::from_millis(30), Pid::new(0, 0));
t.insert_sleep(now + Duration::from_millis(10), Pid::new(1, 0));
t.insert_sleep(now + Duration::from_millis(20), Pid::new(2, 0));
// Advance past all of them.
let due = t.pop_due(now + Duration::from_millis(50));
let pids: Vec<u32> = due.iter().map(|e| e.pid.index()).collect();
assert_eq!(pids, vec![1, 2, 0]);
assert!(t.is_empty());
}
#[test]
fn timers_only_pop_entries_whose_deadline_has_passed() {
let mut t = Timers::new();
let now = Instant::now();
t.insert_sleep(now + Duration::from_millis(5), Pid::new(0, 0));
t.insert_sleep(now + Duration::from_millis(100), Pid::new(1, 0));
let due = t.pop_due(now + Duration::from_millis(20));
assert_eq!(due.len(), 1);
assert_eq!(due[0].pid.index(), 0);
assert!(!t.is_empty());
// The unpopped entry's deadline is still visible.
assert!(t.peek_deadline().is_some());
}
#[test]
fn timers_mix_sleep_and_wait_timeout_reasons() {
let mut t = Timers::new();
let target = Rc::new(RecordingTarget { calls: RefCell::new(Vec::new()) });
let now = Instant::now();
t.insert_sleep(now + Duration::from_millis(5), Pid::new(0, 0));
t.insert(
now + Duration::from_millis(10),
Pid::new(1, 0),
Reason::WaitTimeout { target: target.clone(), wait_seq: 42 },
);
let due = t.pop_due(now + Duration::from_millis(20));
assert_eq!(due.len(), 2);
// Order: Sleep (5ms) first, WaitTimeout (10ms) second.
match &due[0].reason {
Reason::Sleep => {}
_ => panic!("first entry should be a Sleep"),
}
match &due[1].reason {
Reason::WaitTimeout { wait_seq, .. } => assert_eq!(*wait_seq, 42),
_ => panic!("second entry should be a WaitTimeout"),
}
}
#[test]
fn same_deadline_entries_pop_in_insertion_order() {
// The `seq` tiebreaker means inserting two entries with the same
// deadline preserves the order they were inserted.
let mut t = Timers::new();
let now = Instant::now();
let d = now + Duration::from_millis(10);
t.insert_sleep(d, Pid::new(0, 0));
t.insert_sleep(d, Pid::new(1, 0));
t.insert_sleep(d, Pid::new(2, 0));
let due = t.pop_due(now + Duration::from_millis(20));
let pids: Vec<u32> = due.iter().map(|e| e.pid.index()).collect();
assert_eq!(pids, vec![0, 1, 2]);
}