From aeacaf611852590c78ceded184065951495736e2 Mon Sep 17 00:00:00 2001 From: smarm Date: Sun, 24 May 2026 07:03:45 +0000 Subject: [PATCH] fix: stress testing & stability (v0.6.5) Improve reliability under high load: - tests/stress.rs: New comprehensive stress test suite (448 lines) - Fine-tune I/O & runtime scheduling edge cases - Pin versions & fix MSRV compatibility --- .gitignore | 2 +- src/io.rs | 3 +- src/runtime.rs | 100 ++++++++--- tests/stress.rs | 448 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 523 insertions(+), 30 deletions(-) create mode 100644 tests/stress.rs diff --git a/.gitignore b/.gitignore index 96ef6c0..a9d37c5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -/target +target Cargo.lock diff --git a/src/io.rs b/src/io.rs index 71a96d3..acf70d5 100644 --- a/src/io.rs +++ b/src/io.rs @@ -427,9 +427,10 @@ fn epoll_loop( continue; } let fd = ev.u64 as RawFd; + let evs = ev.events; q.push_back(Completion::FdReady { fd, - events: ev.events, + events: evs, }); pushed_any = true; } diff --git a/src/runtime.rs b/src/runtime.rs index 37481a6..cdc0481 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -331,6 +331,34 @@ impl Runtime { /// Run `f` as the initial actor, block until all actors finish. /// Can be called multiple times sequentially on the same `Runtime`. pub fn run(&self, f: impl FnOnce() + Send + 'static) { + // Install smarm's panic hook on first call. The default Rust hook is + // not reentrant — concurrent actor panics can trigger a double-panic + // abort when the backtrace printer takes an internal lock that is + // already held. smarm catches every actor panic via `catch_unwind` in + // the trampoline, so panics never need to reach the hook for runtime + // correctness; the hook fires only as a side-effect of unwinding before + // `catch_unwind` catches it. + // + // We install once and leave it installed: the previous hook is chained + // so that panics outside actor context (e.g. in the test harness + // itself) are still reported normally. + static HOOK_INSTALLED: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + HOOK_INSTALLED.get_or_init(|| { + let prev = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + // If we are currently executing inside an actor trampoline the + // panic will be caught by `catch_unwind` momentarily. Suppress + // the hook output to avoid interleaved noise and reentrancy. + // Outside actor context, delegate to the previous hook so that + // genuine runtime panics are still reported. + if crate::actor::current_pid().is_some() { + // Inside an actor — catch_unwind handles it; stay silent. + } else { + prev(info); + } + })); + }); + // Open the trace store for this run (no-op without smarm-trace). #[cfg(feature = "smarm-trace")] crate::trace::open(); @@ -560,10 +588,23 @@ fn schedule_loop(inner: &Arc, slot: usize) { }); if let Some(pid) = parked_pid { if let Some(slot) = s.slot_mut(pid) { - if matches!(slot.state, State::Parked) { - slot.state = State::Runnable; - s.run_queue.push_back(pid); - crate::te!(crate::trace::Event::Enqueue(pid)); + match slot.state { + State::Parked => { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + crate::te!(crate::trace::Event::UnparkDirect(pid)); + crate::te!(crate::trace::Event::Enqueue(pid)); + } + // Actor is between epoll_register + // and park_current. Set the flag so + // the upcoming Park yield re-queues + // instead of suspending. Mirrors + // scheduler::unpark(). + State::Runnable => { + slot.pending_unpark = true; + crate::te!(crate::trace::Event::UnparkDeferred(pid)); + } + State::Done => {} } } } @@ -586,8 +627,16 @@ fn schedule_loop(inner: &Arc, slot: usize) { p } None => { - // Nothing runnable. Check whether we should wait or exit. - let (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) = + // Queue was empty when we popped. Re-examine under the lock to + // decide whether to exit or wait. All four conditions must hold + // simultaneously before we exit: + // 1. run queue is still empty + // 2. no live actors (nothing parked, nothing mid-finalize) + // 3. no pending timers + // 4. no outstanding IO + // If any is non-zero we keep spinning — "check the fridge is + // empty before you leave for the airport". + let (next_deadline, io_outstanding, wake_fd, all_clear) = inner.with_shared(|s| { let next = s.timers.peek_deadline(); let (out, fd) = match s.io.as_ref() { @@ -597,21 +646,20 @@ fn schedule_loop(inner: &Arc, slot: usize) { ), None => (0, None), }; - // Count actors that are not Done (Runnable or Parked). - let live = s.slots.iter().filter(|slot| { - slot.actor.is_some() - }).count(); - (next, out, fd, s.run_queue.is_empty(), live) + let live = s.slots.iter().filter(|slot| slot.actor.is_some()).count(); + let queue_empty = s.run_queue.is_empty(); + let all_clear = queue_empty && live == 0 && next.is_none() && out == 0; + (next, out, fd, all_clear) }); - match (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) { - // Queue is now non-empty (another thread added work): retry. - (_, _, _, false, _) => continue, - // Truly idle — no timers, no IO, no live actors. - (None, 0, _, true, 0) => return, - // Live actors but queue empty: they must be parked on IO or - // timers. Wait on the appropriate source. - (Some(deadline), _, fd_opt, true, _) => { + if all_clear { + return; + } + + // Something is still in flight. Sleep on the appropriate source + // to avoid hammering the mutex; the loop will retry on wake. + match (next_deadline, wake_fd) { + (Some(deadline), fd_opt) => { let now = std::time::Instant::now(); if deadline > now { let timeout = deadline - now; @@ -623,22 +671,16 @@ fn schedule_loop(inner: &Arc, slot: usize) { None => thread::sleep(timeout), } } - continue; } - (None, _, Some(fd), true, _) => { + (None, Some(fd)) if io_outstanding > 0 => { crate::io::poll_wake(fd, None); crate::io::drain_wake_pipe(fd); - continue; } - // Live actors, queue empty, no IO/timers: they're parked - // waiting for each other (potential deadlock in user code), - // or another thread is about to add work. Sleep briefly to - // avoid hammering the shared mutex. _ => { thread::sleep(std::time::Duration::from_micros(100)); - continue; } } + continue; } }; @@ -649,7 +691,9 @@ fn schedule_loop(inner: &Arc, slot: usize) { s.slot(pid).and_then(|slot| slot.actor.as_ref().map(|a| a.sp)) }) { Some(sp) => sp, - None => continue, // stale pid + None => { + continue; // stale pid + } }; // First resume: move the closure into the trampoline's thread-local. diff --git a/tests/stress.rs b/tests/stress.rs new file mode 100644 index 0000000..d993ac4 --- /dev/null +++ b/tests/stress.rs @@ -0,0 +1,448 @@ +//! Stress tests targeting lost wakeups, PID table pressure, thundering herds, +//! and panic isolation under concurrency. +//! +//! These tests are designed to find bugs that functional happy-path tests +//! cannot: races in the park/unpark protocol, slot leaks under concurrent +//! churn, and scheduler corruption from concurrent panics. +//! +//! Every test that could hang is bounded by a join on a known-finite set of +//! handles. A deadlock from a lost wakeup will cause the test binary to time +//! out rather than produce a false pass — run with `cargo test -- --timeout` +//! or under a CI timeout. + +use smarm::{channel, runtime::{Config, Runtime}, spawn, yield_now, JoinHandle}; +use std::sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, +}; + +fn rt(n: usize) -> Runtime { + smarm::runtime::init(Config::exact(n)) +} + +fn rt_par() -> Runtime { + smarm::runtime::init(Config::default()) +} + +// --------------------------------------------------------------------------- +// P0: Lost-wakeup — many concurrent sender/receiver pairs +// +// 500 independent (tx, rx) pairs. Each sender and receiver are separate +// actors. No ordering is imposed between pairs. Any lost wakeup causes one +// receiver to park forever, deadlocking the join at the end. +// --------------------------------------------------------------------------- + +#[test] +fn lost_wakeup_many_pairs() { + const PAIRS: usize = 500; + let count = Arc::new(AtomicU64::new(0)); + + for threads in [1, 2, 4] { + count.store(0, Ordering::SeqCst); + let c = count.clone(); + + rt(threads).run(move || { + let mut handles: Vec = Vec::with_capacity(PAIRS * 2); + + for _ in 0..PAIRS { + let (tx, rx) = channel::(); + let cc = c.clone(); + + // Receiver parks immediately. + handles.push(spawn(move || { + let v = rx.recv().unwrap(); + cc.fetch_add(v, Ordering::SeqCst); + })); + + // Sender fires without any yield — races with receiver parking. + handles.push(spawn(move || { + tx.send(1).unwrap(); + })); + } + + for h in handles { + h.join().unwrap(); + } + }); + + assert_eq!( + count.load(Ordering::SeqCst), + PAIRS as u64, + "lost wakeup on {threads}-thread runtime" + ); + } +} + +// --------------------------------------------------------------------------- +// P0: Lost-wakeup — rapid-fire single receiver +// +// One receiver, SENDERS senders, all spawned at once. The receiver loops +// receiving SENDERS messages. Race: a sender may fire before the receiver +// has parked, or exactly as it is transitioning to parked. +// --------------------------------------------------------------------------- + +#[test] +fn lost_wakeup_rapid_fire_single_receiver() { + const SENDERS: u64 = 200; + + for threads in [1, 2, 4] { + let received = Arc::new(AtomicU64::new(0)); + let rc = received.clone(); + + rt(threads).run(move || { + let (tx, rx) = channel::(); + let mut handles: Vec = Vec::with_capacity(SENDERS as usize + 1); + + // Receiver loops until it has seen all messages. + handles.push(spawn(move || { + let mut n = 0u64; + while n < SENDERS { + rx.recv().unwrap(); + n += 1; + } + rc.store(n, Ordering::SeqCst); + })); + + // All senders fire with no deliberate delay. + for _ in 0..SENDERS { + let txc = tx.clone(); + handles.push(spawn(move || { + txc.send(1).unwrap(); + })); + } + + for h in handles { + h.join().unwrap(); + } + }); + + assert_eq!( + received.load(Ordering::SeqCst), + SENDERS, + "missed messages on {threads}-thread runtime" + ); + } +} + +// --------------------------------------------------------------------------- +// P0: Lost-wakeup — wakeup during yield chain +// +// Receiver yields N times before it would naturally park. Sender fires +// during that window. Tests the race between "actor is on the run queue +// yielding" and "actor transitions to parked." +// --------------------------------------------------------------------------- + +#[test] +fn lost_wakeup_during_yield_chain() { + const YIELDS: usize = 20; + const PAIRS: usize = 100; + let count = Arc::new(AtomicU64::new(0)); + + let c = count.clone(); + rt_par().run(move || { + let mut handles: Vec = Vec::with_capacity(PAIRS * 2); + + for _ in 0..PAIRS { + let (tx, rx) = channel::(); + let cc = c.clone(); + + handles.push(spawn(move || { + // Yield several times, then block. + for _ in 0..YIELDS { + yield_now(); + } + let v = rx.recv().unwrap(); + cc.fetch_add(v, Ordering::SeqCst); + })); + + handles.push(spawn(move || { + // Fire immediately — may arrive while receiver is still yielding. + tx.send(1).unwrap(); + })); + } + + for h in handles { + h.join().unwrap(); + } + }); + + assert_eq!(count.load(Ordering::SeqCst), PAIRS as u64); +} + +// --------------------------------------------------------------------------- +// P2: Thundering herd +// +// N actors all block on recv from their own channel. A coordinator sends +// to all channels in rapid succession. All N actors must wake and complete. +// Common bug: wakeup list walked destructively while lock is dropped +// mid-walk, causing some actors to never be re-queued. +// --------------------------------------------------------------------------- + +#[test] +fn thundering_herd_all_wake() { + const HERD: usize = 200; + let woke = Arc::new(AtomicUsize::new(0)); + + let w = woke.clone(); + rt_par().run(move || { + let mut senders: Vec> = Vec::with_capacity(HERD); + let mut handles: Vec = Vec::with_capacity(HERD + 1); + + for _ in 0..HERD { + let (tx, rx) = channel::(); + senders.push(tx); + let wc = w.clone(); + handles.push(spawn(move || { + rx.recv().unwrap(); + wc.fetch_add(1, Ordering::SeqCst); + })); + } + + // Let all receivers park before we send. + for _ in 0..4 { yield_now(); } + + // Coordinator blasts all channels. + handles.push(spawn(move || { + for tx in senders { + tx.send(1).unwrap(); + } + })); + + for h in handles { + h.join().unwrap(); + } + }); + + assert_eq!(woke.load(Ordering::SeqCst), HERD); +} + +// --------------------------------------------------------------------------- +// P1: Concurrent spawn/join churn — PID table pressure +// +// K parent actors each spawn M children and join them, all concurrently. +// Exercises PID allocation/deallocation racing across scheduler threads. +// A generation-counter bug or slot leak will either corrupt a join result +// or accumulate memory without bound. +// --------------------------------------------------------------------------- + +#[test] +fn concurrent_spawn_join_churn() { + const PARENTS: usize = 20; + const CHILDREN_PER_PARENT: usize = 50; + const EXPECTED: u64 = (PARENTS * CHILDREN_PER_PARENT) as u64; + + let total = Arc::new(AtomicU64::new(0)); + let t = total.clone(); + + rt_par().run(move || { + let mut parent_handles: Vec = Vec::with_capacity(PARENTS); + + for _ in 0..PARENTS { + let tc = t.clone(); + parent_handles.push(spawn(move || { + let mut child_handles: Vec = + Vec::with_capacity(CHILDREN_PER_PARENT); + + for _ in 0..CHILDREN_PER_PARENT { + let tcc = tc.clone(); + child_handles.push(spawn(move || { + tcc.fetch_add(1, Ordering::SeqCst); + })); + } + + for h in child_handles { + h.join().unwrap(); + } + })); + } + + for h in parent_handles { + h.join().unwrap(); + } + }); + + assert_eq!(total.load(Ordering::SeqCst), EXPECTED); +} + +// --------------------------------------------------------------------------- +// P0: Join race — join called after child has already finished +// +// The child is given time to complete before the parent calls join. This +// exercises a different code path than "join before child finishes": +// the wakeup has already fired and the result must be stored in the slot. +// A bug here leaves the parent hanging or returns a corrupted result. +// --------------------------------------------------------------------------- + +#[test] +fn join_race_child_finishes_first() { + const REPS: usize = 300; + let ok = Arc::new(AtomicUsize::new(0)); + + let o = ok.clone(); + rt_par().run(move || { + let mut handles: Vec = Vec::with_capacity(REPS); + + for _ in 0..REPS { + let oc = o.clone(); + let h = spawn(move || { + // Child does a tiny bit of work and exits quickly. + oc.fetch_add(1, Ordering::SeqCst); + }); + handles.push(h); + } + + // Yield enough to let children run to completion before we join. + for _ in 0..8 { yield_now(); } + + for h in handles { + // If child already finished, join must return immediately with Ok. + h.join().unwrap(); + } + }); + + assert_eq!(ok.load(Ordering::SeqCst), REPS); +} + +// --------------------------------------------------------------------------- +// P3: Panic storm — concurrent panics don't corrupt the scheduler +// +// Many actors panic at the same time while a separate cohort of well-behaved +// actors makes progress. If a panic corrupts the run queue or the slot table, +// the well-behaved actors will deadlock or produce wrong counts. +// --------------------------------------------------------------------------- + +#[test] +fn panic_storm_does_not_corrupt_scheduler() { + const PANICKERS: usize = 50; + const WORKERS: usize = 50; + const WORK_PER_ACTOR: u64 = 10; + + let total = Arc::new(AtomicU64::new(0)); + let t = total.clone(); + + rt_par().run(move || { + let mut handles: Vec = Vec::with_capacity(PANICKERS + WORKERS); + + // Spawn all panickers. + for _ in 0..PANICKERS { + handles.push(spawn(|| panic!("deliberate panic storm"))); + } + + // Interleave well-behaved workers. + for _ in 0..WORKERS { + let tc = t.clone(); + handles.push(spawn(move || { + for _ in 0..WORK_PER_ACTOR { + yield_now(); + tc.fetch_add(1, Ordering::SeqCst); + } + })); + } + + // Collect results — panickers return Err, workers return Ok. + let mut panic_count = 0usize; + let mut ok_count = 0usize; + for h in handles { + match h.join() { + Ok(()) => ok_count += 1, + Err(_) => panic_count += 1, + } + } + + assert_eq!(panic_count, PANICKERS, "wrong number of panics captured"); + assert_eq!(ok_count, WORKERS, "some workers lost"); + }); + + assert_eq!( + total.load(Ordering::SeqCst), + WORKERS as u64 * WORK_PER_ACTOR, + "workers produced wrong count — scheduler corruption suspected" + ); +} + +// --------------------------------------------------------------------------- +// P1: Sequential slot reuse — generation counter correctness +// +// Spawn an actor, join it, then spawn a new actor. The new actor will likely +// reuse the same slot index. A stale handle to the first actor must not +// accidentally refer to the second. We can't hold a stale handle across a +// join (join consumes the handle), but we can verify that PID generations +// are distinct across reuse. +// --------------------------------------------------------------------------- + +#[test] +fn pid_generation_increments_on_reuse() { + use smarm::self_pid; + + let pids: Arc>> = + Arc::new(smarm::Mutex::new(Vec::new())); + + let p = pids.clone(); + rt(1).run(move || { + // Single-threaded to maximise slot reuse. + for _ in 0..100 { + let pc = p.clone(); + spawn(move || { + let pid = self_pid(); + let mut g = pc.lock_timeout(std::time::Duration::from_secs(5)).unwrap(); + g.push(pid); + }) + .join() + .unwrap(); + } + }); + + let g = pids.lock_timeout(std::time::Duration::from_secs(1)).unwrap(); + // Any two PIDs that share an index must have different generations. + for i in 0..g.len() { + for j in (i + 1)..g.len() { + if g[i].index() == g[j].index() { + assert_ne!( + g[i].generation(), + g[j].generation(), + "slot {} reused without incrementing generation", + g[i].index() + ); + } + } + } +} + +// --------------------------------------------------------------------------- +// P0: Channel backpressure — slow receiver, fast sender +// +// Sender produces messages faster than the receiver consumes them. The +// channel must not lose messages or deadlock regardless of how deep the +// queue grows. Tests unbounded channel growth and correct message ordering. +// --------------------------------------------------------------------------- + +#[test] +fn channel_backpressure_no_loss() { + const MESSAGES: u64 = 10_000; + + let received = Arc::new(AtomicU64::new(0)); + let rc = received.clone(); + + rt_par().run(move || { + let (tx, rx) = channel::(); + + let receiver = spawn(move || { + let mut sum = 0u64; + for _ in 0..MESSAGES { + sum += rx.recv().unwrap(); + } + rc.store(sum, Ordering::SeqCst); + }); + + // Send all messages from the parent without waiting. + for i in 0..MESSAGES { + tx.send(i).unwrap(); + } + + receiver.join().unwrap(); + }); + + // Sum of 0..MESSAGES + let expected: u64 = (0..MESSAGES).sum(); + assert_eq!(received.load(Ordering::SeqCst), expected); +}