diff --git a/Cargo.toml b/Cargo.toml index 0df0293..d2fc979 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ libc = "0.2" [dev-dependencies] libc = "0.2" -tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "sync"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "sync", "time"] } [profile.dev] panic = "unwind" diff --git a/src/channel.rs b/src/channel.rs index 2192277..0d7e4b0 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -98,12 +98,10 @@ impl Sender { g.parked_receiver.take() }; if let Some(pid) = unpark { - let me = crate::actor::current_pid(); - crate::te!(crate::trace::Event::Send { sender: me.unwrap_or(crate::pid::Pid::new(u32::MAX, u32::MAX)), receiver: Some(pid) }); + crate::te!(crate::trace::Event::Send { sender: crate::actor::current_pid().unwrap_or(crate::pid::Pid::new(u32::MAX, u32::MAX)), receiver: Some(pid) }); crate::scheduler::unpark(pid); } else { - let me = crate::actor::current_pid(); - crate::te!(crate::trace::Event::Send { sender: me.unwrap_or(crate::pid::Pid::new(u32::MAX, u32::MAX)), receiver: None }); + crate::te!(crate::trace::Event::Send { sender: crate::actor::current_pid().unwrap_or(crate::pid::Pid::new(u32::MAX, u32::MAX)), receiver: None }); } Ok(()) } @@ -132,9 +130,7 @@ impl Receiver { // Release the lock before parking — the unparker will need it. crate::scheduler::park_current(); // Woken up — record it before looping to check the queue. - if let Some(me) = crate::actor::current_pid() { - crate::te!(crate::trace::Event::RecvWake(me)); - } + crate::te!(crate::trace::Event::RecvWake(crate::actor::current_pid().unwrap())); } } diff --git a/src/preempt.rs b/src/preempt.rs index 4ef52b6..40cf570 100644 --- a/src/preempt.rs +++ b/src/preempt.rs @@ -28,23 +28,42 @@ use std::alloc::{GlobalAlloc, Layout, System}; use std::cell::Cell; -const ALLOC_INTERVAL: u32 = 128; -const TIMESLICE_CYCLES: u64 = 300_000; // ≈ 100µs on a 3 GHz CPU +pub const DEFAULT_ALLOC_INTERVAL: u32 = 128; +pub const DEFAULT_TIMESLICE_CYCLES: u64 = 300_000; // ≈ 100µs on a 3 GHz CPU thread_local! { /// While `false`, the allocator hook is a no-op. pub static PREEMPTION_ENABLED: Cell = const { Cell::new(false) }; /// Countdown to next RDTSC check. Reset to `ALLOC_INTERVAL` on resume. - static ALLOC_COUNT: Cell = const { Cell::new(ALLOC_INTERVAL) }; + static ALLOC_COUNT: Cell = const { Cell::new(DEFAULT_ALLOC_INTERVAL) }; /// RDTSC value written by the scheduler on every actor resume. static TIMESLICE_START: Cell = const { Cell::new(0) }; + + /// Per-thread copy of the configured alloc interval, written once at + /// scheduler-thread startup. Kept in a thread-local so the hot path + /// (`maybe_preempt`) pays only a TLS load, with no cache-coherency traffic. + static CONFIGURED_ALLOC_INTERVAL: Cell = const { Cell::new(DEFAULT_ALLOC_INTERVAL) }; + + /// Per-thread copy of the configured timeslice, written once at + /// scheduler-thread startup. + static CONFIGURED_TIMESLICE_CYCLES: Cell = const { Cell::new(DEFAULT_TIMESLICE_CYCLES) }; +} + +/// Called once per scheduler thread at startup (before any actor runs). +/// Writes the runtime-configured preemption knobs into thread-locals so the +/// hot path reads them without any cross-thread coherency cost. +pub fn configure_preempt(alloc_interval: u32, timeslice_cycles: u64) { + CONFIGURED_ALLOC_INTERVAL.with(|c| c.set(alloc_interval)); + CONFIGURED_TIMESLICE_CYCLES.with(|c| c.set(timeslice_cycles)); + // Also prime the countdown so the first resume uses the right interval. + ALLOC_COUNT.with(|c| c.set(alloc_interval)); } /// Arm the timeslice. Called by the scheduler on every resume. pub fn reset_timeslice() { - ALLOC_COUNT.with(|c| c.set(ALLOC_INTERVAL)); + ALLOC_COUNT.with(|c| c.set(CONFIGURED_ALLOC_INTERVAL.with(|i| i.get()))); TIMESLICE_START.with(|c| c.set(rdtsc())); } @@ -102,10 +121,10 @@ pub fn maybe_preempt() { ALLOC_COUNT.with(|c| { let n = c.get(); if n == 0 { - c.set(ALLOC_INTERVAL); + c.set(CONFIGURED_ALLOC_INTERVAL.with(|i| i.get())); if PREEMPTION_ENABLED.with(|e| e.get()) { let start = TIMESLICE_START.with(|s| s.get()); - if rdtsc().saturating_sub(start) > TIMESLICE_CYCLES { + if rdtsc().saturating_sub(start) > CONFIGURED_TIMESLICE_CYCLES.with(|t| t.get()) { // SAFETY: reachable only inside an actor (the scheduler // sets PREEMPTION_ENABLED on resume and clears it on // return). The scheduler stack is therefore valid. diff --git a/src/runtime.rs b/src/runtime.rs index cdc0481..b4b49ac 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -31,8 +31,8 @@ //! becomes a measured bottleneck. use crate::actor::{ - clear_current_pid, current_pid, is_actor_done, reset_actor_done, - set_current_actor_box, set_current_pid, take_last_outcome, Actor, Outcome, + clear_current_pid, is_actor_done, reset_actor_done, set_current_actor_box, + set_current_pid, take_last_outcome, Actor, Outcome, }; use crate::channel::Sender; use crate::context::{get_actor_sp, set_actor_sp, switch_to_actor}; @@ -70,13 +70,19 @@ pub struct Config { min: usize, max: usize, exact: Option, + alloc_interval: u32, + timeslice_cycles: u64, } impl Config { /// Exact thread count; takes precedence over min/max. pub fn exact(n: usize) -> Self { assert!(n >= 1, "scheduler thread count must be ≥ 1"); - Self { min: n, max: n, exact: Some(n) } + Self { + min: n, max: n, exact: Some(n), + alloc_interval: crate::preempt::DEFAULT_ALLOC_INTERVAL, + timeslice_cycles: crate::preempt::DEFAULT_TIMESLICE_CYCLES, + } } /// Bounded range. Thread count = clamp(available_parallelism, min, max). @@ -86,7 +92,28 @@ impl Config { if let Some(e) = exact { assert!(e >= 1, "exact must be ≥ 1"); } - Self { min, max, exact } + Self { + min, max, exact, + alloc_interval: crate::preempt::DEFAULT_ALLOC_INTERVAL, + timeslice_cycles: crate::preempt::DEFAULT_TIMESLICE_CYCLES, + } + } + + /// How many allocations (or `smarm::check!()` calls) between RDTSC checks. + /// Lower = more responsive preemption, higher = less overhead. + /// Default: 128. + pub fn alloc_interval(mut self, n: u32) -> Self { + assert!(n >= 1, "alloc_interval must be ≥ 1"); + self.alloc_interval = n; + self + } + + /// How many TSC cycles constitute one timeslice. + /// Default: 300_000 (≈ 100µs on a 3 GHz CPU). + pub fn timeslice_cycles(mut self, n: u64) -> Self { + assert!(n >= 1, "timeslice_cycles must be ≥ 1"); + self.timeslice_cycles = n; + self } /// The number of scheduler threads this config resolves to. @@ -106,7 +133,11 @@ impl Default for Config { let avail = thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); - Self { min: 1, max: avail, exact: None } + Self { + min: 1, max: avail, exact: None, + alloc_interval: crate::preempt::DEFAULT_ALLOC_INTERVAL, + timeslice_cycles: crate::preempt::DEFAULT_TIMESLICE_CYCLES, + } } } @@ -270,10 +301,13 @@ pub(crate) struct RuntimeInner { /// Global counters for RFC 000 primitives. pub(crate) io_parked: AtomicU32, pub(crate) sleeping: AtomicU32, + /// Preemption knobs, written into each scheduler thread's locals on startup. + pub(crate) alloc_interval: u32, + pub(crate) timeslice_cycles: u64, } impl RuntimeInner { - fn new(thread_count: usize) -> Arc { + fn new(thread_count: usize, alloc_interval: u32, timeslice_cycles: u64) -> Arc { let stats = (0..thread_count).map(|_| SchedulerStats::new()).collect(); Arc::new(Self { shared: Mutex::new(SharedState::new()), @@ -281,6 +315,8 @@ impl RuntimeInner { stats, io_parked: AtomicU32::new(0), sleeping: AtomicU32::new(0), + alloc_interval, + timeslice_cycles, }) } @@ -295,18 +331,6 @@ impl RuntimeInner { crate::preempt::PREEMPTION_ENABLED.with(|c| c.set(prev)); result } - - /// Returns `None` when the mutex is poisoned. - /// Used in `unpark` / channel Drop which can fire after teardown. - pub(crate) fn try_with_shared(&self, f: impl FnOnce(&mut SharedState) -> R) -> Option { - let prev = crate::preempt::PREEMPTION_ENABLED.with(|c| c.replace(false)); - let result = match self.shared.lock() { - Ok(mut g) => Some(f(&mut g)), - Err(p) => Some(f(&mut p.into_inner())), - }; - crate::preempt::PREEMPTION_ENABLED.with(|c| c.set(prev)); - result - } } // --------------------------------------------------------------------------- @@ -322,7 +346,7 @@ pub struct Runtime { pub fn init(config: Config) -> Runtime { let n = config.resolved_thread_count(); Runtime { - inner: RuntimeInner::new(n), + inner: RuntimeInner::new(n, config.alloc_interval, config.timeslice_cycles), thread_count: n, } } @@ -526,6 +550,7 @@ fn finalize_actor(inner: &Arc, pid: Pid, outcome: Outcome) { // --------------------------------------------------------------------------- fn schedule_loop(inner: &Arc, slot: usize) { + crate::preempt::configure_preempt(inner.alloc_interval, inner.timeslice_cycles); let stats = &inner.stats[slot]; loop { diff --git a/src/scheduler.rs b/src/scheduler.rs index efde859..8c6a2d2 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -11,7 +11,7 @@ use crate::actor::current_pid; use crate::channel::Sender; use crate::pid::Pid; use crate::runtime::{ - self, RuntimeInner, YieldIntent, ROOT_PID, RUNTIME, + self, RuntimeInner, YieldIntent, RUNTIME, }; use crate::supervisor::Signal; use std::sync::Arc; diff --git a/tests/runtime.rs b/tests/runtime.rs index e4c7b32..86ccb47 100644 --- a/tests/runtime.rs +++ b/tests/runtime.rs @@ -15,10 +15,7 @@ //! - Panic on one scheduler thread doesn't kill others use smarm::{channel, runtime::{Config, Runtime}, spawn, yield_now, JoinHandle}; -use std::sync::{ - atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, - Arc, Barrier, -}; +use std::sync::{atomic::{AtomicBool, AtomicU64, Ordering}, Arc}; use std::time::Duration; use std::collections::HashSet;