From 2cf75febdc85261a776b2416ea91972aee0b61d3 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 22 May 2026 05:22:55 +0000 Subject: [PATCH] timer: sleep(duration) via min-heap of (deadline, pid) Adds a BinaryHeap of timer entries on SchedulerState. sleep() inserts an entry and parks; schedule_loop pops due entries each iteration and unparks them. When the run queue is empty but timers are pending, the OS thread sleeps until the soonest deadline. Single-threaded only; thread::sleep is fine because no other thread can wake us. The IO thread coming next will need a Condvar or pipe wakeup to break this OS-sleep early. --- src/lib.rs | 3 +- src/scheduler.rs | 47 ++++++++++++++++++- src/timer.rs | 89 ++++++++++++++++++++++++++++++++++++ tests/timer.rs | 116 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 src/timer.rs create mode 100644 tests/timer.rs diff --git a/src/lib.rs b/src/lib.rs index 5ec7bf8..17639e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,6 +18,7 @@ pub mod actor; pub mod channel; pub mod scheduler; pub mod supervisor; +pub mod timer; // --------------------------------------------------------------------------- // Global allocator @@ -36,5 +37,5 @@ static ALLOCATOR: preempt::PreemptingAllocator = preempt::PreemptingAllocator; pub use channel::{channel, Receiver, RecvError, Sender}; pub use pid::Pid; -pub use scheduler::{run, self_pid, spawn, spawn_under, yield_now, JoinError, JoinHandle}; +pub use scheduler::{run, self_pid, sleep, spawn, spawn_under, yield_now, JoinError, JoinHandle}; pub use supervisor::Signal; diff --git a/src/scheduler.rs b/src/scheduler.rs index 2879f01..7ffd391 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -100,6 +100,8 @@ struct SchedulerState { /// The root supervisor's PID. Children spawned at the top level are /// supervised by this. Set by `run()`. root_pid: Option, + /// Pending sleep timers. Min-heap keyed by deadline. + timers: crate::timer::Timers, } impl SchedulerState { @@ -109,6 +111,7 @@ impl SchedulerState { free_list: Vec::new(), run_queue: VecDeque::new(), root_pid: None, + timers: crate::timer::Timers::new(), } } @@ -331,6 +334,16 @@ pub fn park_current() { unsafe { crate::context::switch_to_scheduler() }; } +/// Park the current actor for at least `duration`. A zero duration behaves +/// like `yield_now` (the deadline is immediately in the past, so the timer +/// pops on the next scheduler iteration). +pub fn sleep(duration: std::time::Duration) { + let me = current_pid().expect("sleep() called outside an actor"); + let deadline = crate::timer::deadline_from_now(duration); + with_sched(|s| s.timers.insert(deadline, me)); + park_current(); +} + /// Wake a parked actor. If the actor isn't parked (already runnable or done) /// this is a no-op — that's important; channel and join can both fire /// spurious unparks under some orderings and we want them to be cheap. @@ -417,9 +430,41 @@ pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX); fn schedule_loop() { loop { + // 1. Drain due timers into the run queue. + let now = std::time::Instant::now(); + let due = with_sched(|s| s.timers.pop_due(now)); + for pid in due { + // Same idempotency as `unpark`: only re-queue if still parked. + with_sched(|s| { + if let Some(slot) = s.slot_mut(pid) { + if matches!(slot.state, State::Parked) { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + } + } + }); + } + + // 2. Pop a runnable actor. If none, sleep on the soonest timer or + // exit if there isn't one. let pid = match with_sched(|s| s.run_queue.pop_front()) { Some(p) => p, - None => return, + None => { + let next = with_sched(|s| s.timers.peek_deadline()); + match next { + Some(deadline) => { + let now = std::time::Instant::now(); + if deadline > now { + // No other thread can wake us; plain sleep is + // correct. When the IO thread lands in v0.2 + // this becomes a Condvar / pipe wakeup. + std::thread::sleep(deadline - now); + } + continue; + } + None => return, // no runnables, no timers — done. + } + } }; // Look up sp; skip stale or already-reaped pids. diff --git a/src/timer.rs b/src/timer.rs new file mode 100644 index 0000000..e12b3c1 --- /dev/null +++ b/src/timer.rs @@ -0,0 +1,89 @@ +//! Sleep timers. +//! +//! A min-heap of `(deadline, Pid)` entries lives on `SchedulerState`. When +//! an actor calls `sleep`, the runtime inserts the entry, marks the actor +//! parked, and yields. On every scheduler loop iteration the runtime pops +//! all entries whose deadline has passed and unparks them. When the run +//! queue is empty but the heap is not, the runtime sleeps the OS thread +//! until the soonest deadline, then re-checks. +//! +//! `BinaryHeap` is a max-heap, so entries are stored with their deadline +//! wrapped in `Reverse` to get min-heap behaviour. +//! +//! Stale pids (slot reused since the timer was inserted) are detected on +//! `due_pids` pop and silently dropped — same convention as the run queue. + +use crate::pid::Pid; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::time::{Duration, Instant}; + +#[derive(PartialEq, Eq)] +pub struct Entry { + pub deadline: Instant, + pub pid: Pid, +} + +impl Ord for Entry { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // Only `deadline` matters for ordering; pid is a tiebreaker so the + // type is Ord, but the order among same-deadline entries is + // irrelevant. + self.deadline + .cmp(&other.deadline) + .then_with(|| self.pid.index().cmp(&other.pid.index())) + .then_with(|| self.pid.generation().cmp(&other.pid.generation())) + } +} + +impl PartialOrd for Entry { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[derive(Default)] +pub struct Timers { + /// Reverse-wrapped so the smallest deadline is at the top. + heap: BinaryHeap>, +} + +impl Timers { + pub fn new() -> Self { + Self { heap: BinaryHeap::new() } + } + + pub fn insert(&mut self, deadline: Instant, pid: Pid) { + self.heap.push(Reverse(Entry { deadline, pid })); + } + + pub fn is_empty(&self) -> bool { + self.heap.is_empty() + } + + /// Soonest pending deadline, or `None` if the heap is empty. + pub fn peek_deadline(&self) -> Option { + self.heap.peek().map(|r| r.0.deadline) + } + + /// Pop and return every pid whose deadline is ≤ `now`. + pub fn pop_due(&mut self, now: Instant) -> Vec { + let mut out = Vec::new(); + while let Some(r) = self.heap.peek() { + if r.0.deadline <= now { + let e = self.heap.pop().unwrap().0; + out.push(e.pid); + } else { + break; + } + } + out + } +} + +/// Wall-clock duration helper exposed for `sleep`. +pub fn deadline_from_now(duration: Duration) -> Instant { + Instant::now() + .checked_add(duration) + .unwrap_or_else(Instant::now) +} diff --git a/tests/timer.rs b/tests/timer.rs new file mode 100644 index 0000000..14756d9 --- /dev/null +++ b/tests/timer.rs @@ -0,0 +1,116 @@ +//! Timer / sleep tests. These are time-sensitive and use generous +//! tolerances — we care about ordering and "didn't return instantly / +//! didn't take forever," not microsecond-precise scheduling. + +use smarm::{run, sleep, spawn}; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +#[test] +fn sleep_returns_after_at_least_the_requested_duration() { + run(|| { + let t0 = Instant::now(); + sleep(Duration::from_millis(50)); + let elapsed = t0.elapsed(); + assert!( + elapsed >= Duration::from_millis(45), + "slept only {:?}, expected ≥ ~50ms", + elapsed + ); + // Loose upper bound — anything wildly slow indicates a bug. + assert!( + elapsed < Duration::from_millis(500), + "slept {:?}, far longer than the 50ms request", + elapsed + ); + }); +} + +#[test] +fn shorter_sleep_wakes_first() { + let log: Arc>> = Arc::new(Mutex::new(Vec::new())); + let l1 = log.clone(); + let l2 = log.clone(); + + run(move || { + let h1 = spawn(move || { + sleep(Duration::from_millis(60)); + l1.lock().unwrap().push(1); + }); + let h2 = spawn(move || { + sleep(Duration::from_millis(20)); + l2.lock().unwrap().push(2); + }); + h1.join().unwrap(); + h2.join().unwrap(); + }); + + // 2 (shorter sleep) wakes before 1. + assert_eq!(*log.lock().unwrap(), vec![2, 1]); +} + +#[test] +fn one_sleeping_actor_does_not_block_other_runnable_actors() { + let log: Arc>> = Arc::new(Mutex::new(Vec::new())); + let l1 = log.clone(); + let l2 = log.clone(); + + run(move || { + let h1 = spawn(move || { + sleep(Duration::from_millis(100)); + l1.lock().unwrap().push(1); + }); + let h2 = spawn(move || { + // Doesn't sleep. Should be able to run while h1 is parked. + for _ in 0..3 { + l2.lock().unwrap().push(2); + smarm::yield_now(); + } + }); + h2.join().unwrap(); + h1.join().unwrap(); + }); + + let v = log.lock().unwrap(); + // h2 finishes long before h1's 100ms timer. + let h2_count = v.iter().filter(|&&x| x == 2).count(); + let h1_pos = v.iter().position(|&x| x == 1); + assert_eq!(h2_count, 3); + // h1's push should land after h2 is fully done. + if let Some(p) = h1_pos { + assert!(p >= h2_count, "h1 woke before h2 finished: log = {:?}", *v); + } +} + +#[test] +fn zero_duration_sleep_yields_but_does_not_park_forever() { + // A zero-duration sleep should behave like yield_now: control returns + // promptly without hanging. + run(|| { + let t0 = Instant::now(); + sleep(Duration::from_millis(0)); + assert!(t0.elapsed() < Duration::from_millis(100)); + }); +} + +#[test] +fn many_concurrent_sleepers_all_wake() { + let counter = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let c = counter.clone(); + run(move || { + let mut handles = Vec::new(); + for i in 0..20u64 { + let cc = c.clone(); + handles.push(spawn(move || { + // Stagger so they don't all coalesce to the same wake. + sleep(Duration::from_millis(5 + i * 2)); + cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + })); + } + for h in handles { + h.join().unwrap(); + } + }); + assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 20); +}