timer: sleep(duration) via min-heap of (deadline, pid)
Adds a BinaryHeap of timer entries on SchedulerState. sleep() inserts an entry and parks; schedule_loop pops due entries each iteration and unparks them. When the run queue is empty but timers are pending, the OS thread sleeps until the soonest deadline. Single-threaded only; thread::sleep is fine because no other thread can wake us. The IO thread coming next will need a Condvar or pipe wakeup to break this OS-sleep early.
This commit is contained in:
@@ -18,6 +18,7 @@ pub mod actor;
|
||||
pub mod channel;
|
||||
pub mod scheduler;
|
||||
pub mod supervisor;
|
||||
pub mod timer;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Global allocator
|
||||
@@ -36,5 +37,5 @@ static ALLOCATOR: preempt::PreemptingAllocator = preempt::PreemptingAllocator;
|
||||
|
||||
pub use channel::{channel, Receiver, RecvError, Sender};
|
||||
pub use pid::Pid;
|
||||
pub use scheduler::{run, self_pid, spawn, spawn_under, yield_now, JoinError, JoinHandle};
|
||||
pub use scheduler::{run, self_pid, sleep, spawn, spawn_under, yield_now, JoinError, JoinHandle};
|
||||
pub use supervisor::Signal;
|
||||
|
||||
@@ -100,6 +100,8 @@ struct SchedulerState {
|
||||
/// The root supervisor's PID. Children spawned at the top level are
|
||||
/// supervised by this. Set by `run()`.
|
||||
root_pid: Option<Pid>,
|
||||
/// Pending sleep timers. Min-heap keyed by deadline.
|
||||
timers: crate::timer::Timers,
|
||||
}
|
||||
|
||||
impl SchedulerState {
|
||||
@@ -109,6 +111,7 @@ impl SchedulerState {
|
||||
free_list: Vec::new(),
|
||||
run_queue: VecDeque::new(),
|
||||
root_pid: None,
|
||||
timers: crate::timer::Timers::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,6 +334,16 @@ pub fn park_current() {
|
||||
unsafe { crate::context::switch_to_scheduler() };
|
||||
}
|
||||
|
||||
/// Park the current actor for at least `duration`. A zero duration behaves
|
||||
/// like `yield_now` (the deadline is immediately in the past, so the timer
|
||||
/// pops on the next scheduler iteration).
|
||||
pub fn sleep(duration: std::time::Duration) {
|
||||
let me = current_pid().expect("sleep() called outside an actor");
|
||||
let deadline = crate::timer::deadline_from_now(duration);
|
||||
with_sched(|s| s.timers.insert(deadline, me));
|
||||
park_current();
|
||||
}
|
||||
|
||||
/// Wake a parked actor. If the actor isn't parked (already runnable or done)
|
||||
/// this is a no-op — that's important; channel and join can both fire
|
||||
/// spurious unparks under some orderings and we want them to be cheap.
|
||||
@@ -417,9 +430,41 @@ pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX);
|
||||
|
||||
fn schedule_loop() {
|
||||
loop {
|
||||
// 1. Drain due timers into the run queue.
|
||||
let now = std::time::Instant::now();
|
||||
let due = with_sched(|s| s.timers.pop_due(now));
|
||||
for pid in due {
|
||||
// Same idempotency as `unpark`: only re-queue if still parked.
|
||||
with_sched(|s| {
|
||||
if let Some(slot) = s.slot_mut(pid) {
|
||||
if matches!(slot.state, State::Parked) {
|
||||
slot.state = State::Runnable;
|
||||
s.run_queue.push_back(pid);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// 2. Pop a runnable actor. If none, sleep on the soonest timer or
|
||||
// exit if there isn't one.
|
||||
let pid = match with_sched(|s| s.run_queue.pop_front()) {
|
||||
Some(p) => p,
|
||||
None => return,
|
||||
None => {
|
||||
let next = with_sched(|s| s.timers.peek_deadline());
|
||||
match next {
|
||||
Some(deadline) => {
|
||||
let now = std::time::Instant::now();
|
||||
if deadline > now {
|
||||
// No other thread can wake us; plain sleep is
|
||||
// correct. When the IO thread lands in v0.2
|
||||
// this becomes a Condvar / pipe wakeup.
|
||||
std::thread::sleep(deadline - now);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
None => return, // no runnables, no timers — done.
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Look up sp; skip stale or already-reaped pids.
|
||||
|
||||
89
src/timer.rs
Normal file
89
src/timer.rs
Normal file
@@ -0,0 +1,89 @@
|
||||
//! Sleep timers.
|
||||
//!
|
||||
//! A min-heap of `(deadline, Pid)` entries lives on `SchedulerState`. When
|
||||
//! an actor calls `sleep`, the runtime inserts the entry, marks the actor
|
||||
//! parked, and yields. On every scheduler loop iteration the runtime pops
|
||||
//! all entries whose deadline has passed and unparks them. When the run
|
||||
//! queue is empty but the heap is not, the runtime sleeps the OS thread
|
||||
//! until the soonest deadline, then re-checks.
|
||||
//!
|
||||
//! `BinaryHeap` is a max-heap, so entries are stored with their deadline
|
||||
//! wrapped in `Reverse` to get min-heap behaviour.
|
||||
//!
|
||||
//! Stale pids (slot reused since the timer was inserted) are detected on
|
||||
//! `due_pids` pop and silently dropped — same convention as the run queue.
|
||||
|
||||
use crate::pid::Pid;
|
||||
use std::cmp::Reverse;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[derive(PartialEq, Eq)]
|
||||
pub struct Entry {
|
||||
pub deadline: Instant,
|
||||
pub pid: Pid,
|
||||
}
|
||||
|
||||
impl Ord for Entry {
|
||||
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
|
||||
// Only `deadline` matters for ordering; pid is a tiebreaker so the
|
||||
// type is Ord, but the order among same-deadline entries is
|
||||
// irrelevant.
|
||||
self.deadline
|
||||
.cmp(&other.deadline)
|
||||
.then_with(|| self.pid.index().cmp(&other.pid.index()))
|
||||
.then_with(|| self.pid.generation().cmp(&other.pid.generation()))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for Entry {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Timers {
|
||||
/// Reverse-wrapped so the smallest deadline is at the top.
|
||||
heap: BinaryHeap<Reverse<Entry>>,
|
||||
}
|
||||
|
||||
impl Timers {
|
||||
pub fn new() -> Self {
|
||||
Self { heap: BinaryHeap::new() }
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, deadline: Instant, pid: Pid) {
|
||||
self.heap.push(Reverse(Entry { deadline, pid }));
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.heap.is_empty()
|
||||
}
|
||||
|
||||
/// Soonest pending deadline, or `None` if the heap is empty.
|
||||
pub fn peek_deadline(&self) -> Option<Instant> {
|
||||
self.heap.peek().map(|r| r.0.deadline)
|
||||
}
|
||||
|
||||
/// Pop and return every pid whose deadline is ≤ `now`.
|
||||
pub fn pop_due(&mut self, now: Instant) -> Vec<Pid> {
|
||||
let mut out = Vec::new();
|
||||
while let Some(r) = self.heap.peek() {
|
||||
if r.0.deadline <= now {
|
||||
let e = self.heap.pop().unwrap().0;
|
||||
out.push(e.pid);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
out
|
||||
}
|
||||
}
|
||||
|
||||
/// Wall-clock duration helper exposed for `sleep`.
|
||||
pub fn deadline_from_now(duration: Duration) -> Instant {
|
||||
Instant::now()
|
||||
.checked_add(duration)
|
||||
.unwrap_or_else(Instant::now)
|
||||
}
|
||||
116
tests/timer.rs
Normal file
116
tests/timer.rs
Normal file
@@ -0,0 +1,116 @@
|
||||
//! Timer / sleep tests. These are time-sensitive and use generous
|
||||
//! tolerances — we care about ordering and "didn't return instantly /
|
||||
//! didn't take forever," not microsecond-precise scheduling.
|
||||
|
||||
use smarm::{run, sleep, spawn};
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[test]
|
||||
fn sleep_returns_after_at_least_the_requested_duration() {
|
||||
run(|| {
|
||||
let t0 = Instant::now();
|
||||
sleep(Duration::from_millis(50));
|
||||
let elapsed = t0.elapsed();
|
||||
assert!(
|
||||
elapsed >= Duration::from_millis(45),
|
||||
"slept only {:?}, expected ≥ ~50ms",
|
||||
elapsed
|
||||
);
|
||||
// Loose upper bound — anything wildly slow indicates a bug.
|
||||
assert!(
|
||||
elapsed < Duration::from_millis(500),
|
||||
"slept {:?}, far longer than the 50ms request",
|
||||
elapsed
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shorter_sleep_wakes_first() {
|
||||
let log: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
let l1 = log.clone();
|
||||
let l2 = log.clone();
|
||||
|
||||
run(move || {
|
||||
let h1 = spawn(move || {
|
||||
sleep(Duration::from_millis(60));
|
||||
l1.lock().unwrap().push(1);
|
||||
});
|
||||
let h2 = spawn(move || {
|
||||
sleep(Duration::from_millis(20));
|
||||
l2.lock().unwrap().push(2);
|
||||
});
|
||||
h1.join().unwrap();
|
||||
h2.join().unwrap();
|
||||
});
|
||||
|
||||
// 2 (shorter sleep) wakes before 1.
|
||||
assert_eq!(*log.lock().unwrap(), vec![2, 1]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn one_sleeping_actor_does_not_block_other_runnable_actors() {
|
||||
let log: Arc<Mutex<Vec<u8>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
let l1 = log.clone();
|
||||
let l2 = log.clone();
|
||||
|
||||
run(move || {
|
||||
let h1 = spawn(move || {
|
||||
sleep(Duration::from_millis(100));
|
||||
l1.lock().unwrap().push(1);
|
||||
});
|
||||
let h2 = spawn(move || {
|
||||
// Doesn't sleep. Should be able to run while h1 is parked.
|
||||
for _ in 0..3 {
|
||||
l2.lock().unwrap().push(2);
|
||||
smarm::yield_now();
|
||||
}
|
||||
});
|
||||
h2.join().unwrap();
|
||||
h1.join().unwrap();
|
||||
});
|
||||
|
||||
let v = log.lock().unwrap();
|
||||
// h2 finishes long before h1's 100ms timer.
|
||||
let h2_count = v.iter().filter(|&&x| x == 2).count();
|
||||
let h1_pos = v.iter().position(|&x| x == 1);
|
||||
assert_eq!(h2_count, 3);
|
||||
// h1's push should land after h2 is fully done.
|
||||
if let Some(p) = h1_pos {
|
||||
assert!(p >= h2_count, "h1 woke before h2 finished: log = {:?}", *v);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn zero_duration_sleep_yields_but_does_not_park_forever() {
|
||||
// A zero-duration sleep should behave like yield_now: control returns
|
||||
// promptly without hanging.
|
||||
run(|| {
|
||||
let t0 = Instant::now();
|
||||
sleep(Duration::from_millis(0));
|
||||
assert!(t0.elapsed() < Duration::from_millis(100));
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn many_concurrent_sleepers_all_wake() {
|
||||
let counter = Arc::new(std::sync::atomic::AtomicU32::new(0));
|
||||
let c = counter.clone();
|
||||
run(move || {
|
||||
let mut handles = Vec::new();
|
||||
for i in 0..20u64 {
|
||||
let cc = c.clone();
|
||||
handles.push(spawn(move || {
|
||||
// Stagger so they don't all coalesce to the same wake.
|
||||
sleep(Duration::from_millis(5 + i * 2));
|
||||
cc.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||
}));
|
||||
}
|
||||
for h in handles {
|
||||
h.join().unwrap();
|
||||
}
|
||||
});
|
||||
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 20);
|
||||
}
|
||||
Reference in New Issue
Block a user