//! Stress tests targeting lost wakeups, PID table pressure, thundering herds, //! and panic isolation under concurrency. //! //! These tests are designed to find bugs that functional happy-path tests //! cannot: races in the park/unpark protocol, slot leaks under concurrent //! churn, and scheduler corruption from concurrent panics. //! //! Every test that could hang is bounded by a join on a known-finite set of //! handles. A deadlock from a lost wakeup will cause the test binary to time //! out rather than produce a false pass — run with `cargo test -- --timeout` //! or under a CI timeout. use smarm::{channel, runtime::{Config, Runtime}, spawn, yield_now, JoinHandle}; use std::sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }; fn rt(n: usize) -> Runtime { smarm::runtime::init(Config::exact(n)) } fn rt_par() -> Runtime { smarm::runtime::init(Config::default()) } // --------------------------------------------------------------------------- // P0: Lost-wakeup — many concurrent sender/receiver pairs // // 500 independent (tx, rx) pairs. Each sender and receiver are separate // actors. No ordering is imposed between pairs. Any lost wakeup causes one // receiver to park forever, deadlocking the join at the end. // --------------------------------------------------------------------------- #[test] fn lost_wakeup_many_pairs() { const PAIRS: usize = 500; let count = Arc::new(AtomicU64::new(0)); for threads in [1, 2, 4] { count.store(0, Ordering::SeqCst); let c = count.clone(); rt(threads).run(move || { let mut handles: Vec = Vec::with_capacity(PAIRS * 2); for _ in 0..PAIRS { let (tx, rx) = channel::(); let cc = c.clone(); // Receiver parks immediately. handles.push(spawn(move || { let v = rx.recv().unwrap(); cc.fetch_add(v, Ordering::SeqCst); })); // Sender fires without any yield — races with receiver parking. handles.push(spawn(move || { tx.send(1).unwrap(); })); } for h in handles { h.join().unwrap(); } }); assert_eq!( count.load(Ordering::SeqCst), PAIRS as u64, "lost wakeup on {threads}-thread runtime" ); } } // --------------------------------------------------------------------------- // P0: Lost-wakeup — rapid-fire single receiver // // One receiver, SENDERS senders, all spawned at once. The receiver loops // receiving SENDERS messages. Race: a sender may fire before the receiver // has parked, or exactly as it is transitioning to parked. // --------------------------------------------------------------------------- #[test] fn lost_wakeup_rapid_fire_single_receiver() { const SENDERS: u64 = 200; for threads in [1, 2, 4] { let received = Arc::new(AtomicU64::new(0)); let rc = received.clone(); rt(threads).run(move || { let (tx, rx) = channel::(); let mut handles: Vec = Vec::with_capacity(SENDERS as usize + 1); // Receiver loops until it has seen all messages. handles.push(spawn(move || { let mut n = 0u64; while n < SENDERS { rx.recv().unwrap(); n += 1; } rc.store(n, Ordering::SeqCst); })); // All senders fire with no deliberate delay. for _ in 0..SENDERS { let txc = tx.clone(); handles.push(spawn(move || { txc.send(1).unwrap(); })); } for h in handles { h.join().unwrap(); } }); assert_eq!( received.load(Ordering::SeqCst), SENDERS, "missed messages on {threads}-thread runtime" ); } } // --------------------------------------------------------------------------- // P0: Lost-wakeup — wakeup during yield chain // // Receiver yields N times before it would naturally park. Sender fires // during that window. Tests the race between "actor is on the run queue // yielding" and "actor transitions to parked." // --------------------------------------------------------------------------- #[test] fn lost_wakeup_during_yield_chain() { const YIELDS: usize = 20; const PAIRS: usize = 100; let count = Arc::new(AtomicU64::new(0)); let c = count.clone(); rt_par().run(move || { let mut handles: Vec = Vec::with_capacity(PAIRS * 2); for _ in 0..PAIRS { let (tx, rx) = channel::(); let cc = c.clone(); handles.push(spawn(move || { // Yield several times, then block. for _ in 0..YIELDS { yield_now(); } let v = rx.recv().unwrap(); cc.fetch_add(v, Ordering::SeqCst); })); handles.push(spawn(move || { // Fire immediately — may arrive while receiver is still yielding. tx.send(1).unwrap(); })); } for h in handles { h.join().unwrap(); } }); assert_eq!(count.load(Ordering::SeqCst), PAIRS as u64); } // --------------------------------------------------------------------------- // P2: Thundering herd // // N actors all block on recv from their own channel. A coordinator sends // to all channels in rapid succession. All N actors must wake and complete. // Common bug: wakeup list walked destructively while lock is dropped // mid-walk, causing some actors to never be re-queued. // --------------------------------------------------------------------------- #[test] fn thundering_herd_all_wake() { const HERD: usize = 200; let woke = Arc::new(AtomicUsize::new(0)); let w = woke.clone(); rt_par().run(move || { let mut senders: Vec> = Vec::with_capacity(HERD); let mut handles: Vec = Vec::with_capacity(HERD + 1); for _ in 0..HERD { let (tx, rx) = channel::(); senders.push(tx); let wc = w.clone(); handles.push(spawn(move || { rx.recv().unwrap(); wc.fetch_add(1, Ordering::SeqCst); })); } // Let all receivers park before we send. for _ in 0..4 { yield_now(); } // Coordinator blasts all channels. handles.push(spawn(move || { for tx in senders { tx.send(1).unwrap(); } })); for h in handles { h.join().unwrap(); } }); assert_eq!(woke.load(Ordering::SeqCst), HERD); } // --------------------------------------------------------------------------- // P1: Concurrent spawn/join churn — PID table pressure // // K parent actors each spawn M children and join them, all concurrently. // Exercises PID allocation/deallocation racing across scheduler threads. // A generation-counter bug or slot leak will either corrupt a join result // or accumulate memory without bound. // --------------------------------------------------------------------------- #[test] fn concurrent_spawn_join_churn() { const PARENTS: usize = 20; const CHILDREN_PER_PARENT: usize = 50; const EXPECTED: u64 = (PARENTS * CHILDREN_PER_PARENT) as u64; let total = Arc::new(AtomicU64::new(0)); let t = total.clone(); rt_par().run(move || { let mut parent_handles: Vec = Vec::with_capacity(PARENTS); for _ in 0..PARENTS { let tc = t.clone(); parent_handles.push(spawn(move || { let mut child_handles: Vec = Vec::with_capacity(CHILDREN_PER_PARENT); for _ in 0..CHILDREN_PER_PARENT { let tcc = tc.clone(); child_handles.push(spawn(move || { tcc.fetch_add(1, Ordering::SeqCst); })); } for h in child_handles { h.join().unwrap(); } })); } for h in parent_handles { h.join().unwrap(); } }); assert_eq!(total.load(Ordering::SeqCst), EXPECTED); } // --------------------------------------------------------------------------- // P0: Join race — join called after child has already finished // // The child is given time to complete before the parent calls join. This // exercises a different code path than "join before child finishes": // the wakeup has already fired and the result must be stored in the slot. // A bug here leaves the parent hanging or returns a corrupted result. // --------------------------------------------------------------------------- #[test] fn join_race_child_finishes_first() { const REPS: usize = 300; let ok = Arc::new(AtomicUsize::new(0)); let o = ok.clone(); rt_par().run(move || { let mut handles: Vec = Vec::with_capacity(REPS); for _ in 0..REPS { let oc = o.clone(); let h = spawn(move || { // Child does a tiny bit of work and exits quickly. oc.fetch_add(1, Ordering::SeqCst); }); handles.push(h); } // Yield enough to let children run to completion before we join. for _ in 0..8 { yield_now(); } for h in handles { // If child already finished, join must return immediately with Ok. h.join().unwrap(); } }); assert_eq!(ok.load(Ordering::SeqCst), REPS); } // --------------------------------------------------------------------------- // P3: Panic storm — concurrent panics don't corrupt the scheduler // // Many actors panic at the same time while a separate cohort of well-behaved // actors makes progress. If a panic corrupts the run queue or the slot table, // the well-behaved actors will deadlock or produce wrong counts. // --------------------------------------------------------------------------- #[test] fn panic_storm_does_not_corrupt_scheduler() { const PANICKERS: usize = 50; const WORKERS: usize = 50; const WORK_PER_ACTOR: u64 = 10; let total = Arc::new(AtomicU64::new(0)); let t = total.clone(); rt_par().run(move || { let mut handles: Vec = Vec::with_capacity(PANICKERS + WORKERS); // Spawn all panickers. for _ in 0..PANICKERS { handles.push(spawn(|| panic!("deliberate panic storm"))); } // Interleave well-behaved workers. for _ in 0..WORKERS { let tc = t.clone(); handles.push(spawn(move || { for _ in 0..WORK_PER_ACTOR { yield_now(); tc.fetch_add(1, Ordering::SeqCst); } })); } // Collect results — panickers return Err, workers return Ok. let mut panic_count = 0usize; let mut ok_count = 0usize; for h in handles { match h.join() { Ok(()) => ok_count += 1, Err(_) => panic_count += 1, } } assert_eq!(panic_count, PANICKERS, "wrong number of panics captured"); assert_eq!(ok_count, WORKERS, "some workers lost"); }); assert_eq!( total.load(Ordering::SeqCst), WORKERS as u64 * WORK_PER_ACTOR, "workers produced wrong count — scheduler corruption suspected" ); } // --------------------------------------------------------------------------- // P1: Sequential slot reuse — generation counter correctness // // Spawn an actor, join it, then spawn a new actor. The new actor will likely // reuse the same slot index. A stale handle to the first actor must not // accidentally refer to the second. We can't hold a stale handle across a // join (join consumes the handle), but we can verify that PID generations // are distinct across reuse. // --------------------------------------------------------------------------- #[test] fn pid_generation_increments_on_reuse() { use smarm::self_pid; let pids: Arc>> = Arc::new(smarm::Mutex::new(Vec::new())); let p = pids.clone(); rt(1).run(move || { // Single-threaded to maximise slot reuse. for _ in 0..100 { let pc = p.clone(); spawn(move || { let pid = self_pid(); let mut g = pc.lock_timeout(std::time::Duration::from_secs(5)).unwrap(); g.push(pid); }) .join() .unwrap(); } }); let g = pids.lock_timeout(std::time::Duration::from_secs(1)).unwrap(); // Any two PIDs that share an index must have different generations. for i in 0..g.len() { for j in (i + 1)..g.len() { if g[i].index() == g[j].index() { assert_ne!( g[i].generation(), g[j].generation(), "slot {} reused without incrementing generation", g[i].index() ); } } } } // --------------------------------------------------------------------------- // P0: Channel backpressure — slow receiver, fast sender // // Sender produces messages faster than the receiver consumes them. The // channel must not lose messages or deadlock regardless of how deep the // queue grows. Tests unbounded channel growth and correct message ordering. // --------------------------------------------------------------------------- #[test] fn channel_backpressure_no_loss() { const MESSAGES: u64 = 10_000; let received = Arc::new(AtomicU64::new(0)); let rc = received.clone(); rt_par().run(move || { let (tx, rx) = channel::(); let receiver = spawn(move || { let mut sum = 0u64; for _ in 0..MESSAGES { sum += rx.recv().unwrap(); } rc.store(sum, Ordering::SeqCst); }); // Send all messages from the parent without waiting. for i in 0..MESSAGES { tx.send(i).unwrap(); } receiver.join().unwrap(); }); // Sum of 0..MESSAGES let expected: u64 = (0..MESSAGES).sum(); assert_eq!(received.load(Ordering::SeqCst), expected); }