chore: reset working tree (v0.5)
Temporary commit clearing working tree for v0.6 rebuild
This commit is contained in:
669
src/runtime.rs
669
src/runtime.rs
@@ -1,669 +0,0 @@
|
||||
//! Multi-scheduler runtime: configuration, initialisation, and the shared
|
||||
//! state that all scheduler OS threads operate against.
|
||||
//!
|
||||
//! # Architecture
|
||||
//!
|
||||
//! ```text
|
||||
//! init(Config) → Runtime (Arc<RuntimeInner>)
|
||||
//!
|
||||
//! RuntimeInner {
|
||||
//! shared: Mutex<SharedState> ← slot table, run queue, timers, IO
|
||||
//! stats: Vec<SchedulerStats> ← one per thread, lockless atomics (RFC 000)
|
||||
//! io_parked: AtomicU32 ← actors parked on IO
|
||||
//! sleeping: AtomicU32 ← actors parked on timer
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! `Runtime::run(f)` spawns N OS threads (one per `Config::resolved_thread_count()`),
|
||||
//! each running `schedule_loop`. It blocks until all scheduler threads exit,
|
||||
//! i.e. until the run queue is empty and nothing is pending.
|
||||
//!
|
||||
//! Each scheduler thread holds an `Arc<RuntimeInner>` clone. Per-thread
|
||||
//! identity is a small integer index, stored in a thread-local, used to index
|
||||
//! into `stats`.
|
||||
//!
|
||||
//! # Timer / IO drain (try-lock, one-winner)
|
||||
//!
|
||||
//! On each loop iteration every scheduler thread tries `try_lock()` on a
|
||||
//! separate `drain_lock: Mutex<()>`. The winner drains due timers and IO
|
||||
//! completions; losers skip and move straight to popping an actor from the
|
||||
//! run queue. This is the simplest correct approach; revisit if the drain
|
||||
//! becomes a measured bottleneck.
|
||||
|
||||
use crate::actor::{
|
||||
clear_current_pid, current_pid, is_actor_done, reset_actor_done,
|
||||
set_current_actor_box, set_current_pid, take_last_outcome, Actor, Outcome,
|
||||
};
|
||||
use crate::channel::Sender;
|
||||
use crate::context::{get_actor_sp, set_actor_sp, switch_to_actor};
|
||||
use crate::io::IoThread;
|
||||
use crate::pid::Pid;
|
||||
use crate::preempt::PREEMPTION_ENABLED;
|
||||
use crate::supervisor::Signal;
|
||||
use crate::timer::Timers;
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Config
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Runtime configuration.
|
||||
///
|
||||
/// ```
|
||||
/// use smarm::runtime::Config;
|
||||
///
|
||||
/// // Use all available CPUs (default):
|
||||
/// let c = Config::default();
|
||||
///
|
||||
/// // Exactly 4 scheduler threads:
|
||||
/// let c = Config::exact(4);
|
||||
///
|
||||
/// // Between 2 and 8, clamped to available parallelism:
|
||||
/// let c = Config::new(2, 8, None);
|
||||
/// ```
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Config {
|
||||
min: usize,
|
||||
max: usize,
|
||||
exact: Option<usize>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
/// Exact thread count; takes precedence over min/max.
|
||||
pub fn exact(n: usize) -> Self {
|
||||
assert!(n >= 1, "scheduler thread count must be ≥ 1");
|
||||
Self { min: n, max: n, exact: Some(n) }
|
||||
}
|
||||
|
||||
/// Bounded range. Thread count = clamp(available_parallelism, min, max).
|
||||
pub fn new(min: usize, max: usize, exact: Option<usize>) -> Self {
|
||||
assert!(min >= 1, "min must be ≥ 1");
|
||||
assert!(max >= min, "max must be ≥ min");
|
||||
if let Some(e) = exact {
|
||||
assert!(e >= 1, "exact must be ≥ 1");
|
||||
}
|
||||
Self { min, max, exact }
|
||||
}
|
||||
|
||||
/// The number of scheduler threads this config resolves to.
|
||||
pub fn resolved_thread_count(&self) -> usize {
|
||||
if let Some(e) = self.exact {
|
||||
return e;
|
||||
}
|
||||
let avail = thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
avail.clamp(self.min, self.max)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
let avail = thread::available_parallelism()
|
||||
.map(|n| n.get())
|
||||
.unwrap_or(1);
|
||||
Self { min: 1, max: avail, exact: None }
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Per-thread stats (RFC 000 Layer 1 primitives)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Lockless per-scheduler-thread counters. Written only by the owning thread;
|
||||
/// readable from any thread (introspection actor, tests).
|
||||
pub struct SchedulerStats {
|
||||
/// PID index of the actor currently on-CPU, or `u32::MAX` when idle.
|
||||
pub current_pid_index: AtomicU32,
|
||||
/// Snapshot of run queue length maintained on every push/pop.
|
||||
pub run_queue_len: AtomicU64,
|
||||
}
|
||||
|
||||
impl SchedulerStats {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
current_pid_index: AtomicU32::new(u32::MAX),
|
||||
run_queue_len: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Runtime stats snapshot (for tests / introspection)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub struct RuntimeStats {
|
||||
pub(crate) inner: Arc<RuntimeInner>,
|
||||
}
|
||||
|
||||
impl RuntimeStats {
|
||||
/// Sum of run queue lengths across all scheduler threads.
|
||||
pub fn total_run_queue_len(&self) -> u64 {
|
||||
self.inner.stats.iter()
|
||||
.map(|s| s.run_queue_len.load(Ordering::Relaxed))
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Number of scheduler threads.
|
||||
pub fn scheduler_count(&self) -> usize {
|
||||
self.inner.stats.len()
|
||||
}
|
||||
|
||||
/// Actors currently parked on IO.
|
||||
pub fn io_parked_count(&self) -> u32 {
|
||||
self.inner.io_parked.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Actors currently sleeping on a timer.
|
||||
pub fn sleeping_count(&self) -> u32 {
|
||||
self.inner.sleeping.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Shared state (behind Mutex<>)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub(crate) const ACTOR_STACK_SIZE: usize = 64 * 1024;
|
||||
|
||||
pub(crate) enum State { Runnable, Parked, Done }
|
||||
|
||||
struct Slot {
|
||||
generation: u32,
|
||||
actor: Option<Actor>,
|
||||
state: State,
|
||||
waiters: Vec<Pid>,
|
||||
outcome: Option<Outcome>,
|
||||
supervisor_channel: Option<Sender<Signal>>,
|
||||
outstanding_handles: u32,
|
||||
pending_io_result: Option<crate::io::IoResult>,
|
||||
}
|
||||
|
||||
impl Slot {
|
||||
fn vacant() -> Self {
|
||||
Self {
|
||||
generation: 0,
|
||||
actor: None,
|
||||
state: State::Done,
|
||||
waiters: Vec::new(),
|
||||
outcome: None,
|
||||
supervisor_channel: None,
|
||||
outstanding_handles: 0,
|
||||
pending_io_result: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) type Closure = Box<dyn FnOnce() + Send>;
|
||||
|
||||
pub(crate) struct SharedState {
|
||||
pub(crate) slots: Vec<Slot>,
|
||||
pub(crate) free_list: Vec<u32>,
|
||||
pub(crate) run_queue: VecDeque<Pid>,
|
||||
pub(crate) root_pid: Option<Pid>,
|
||||
pub(crate) timers: Timers,
|
||||
pub(crate) io: Option<IoThread>,
|
||||
/// Closures awaiting their first resume, keyed by Pid.
|
||||
pub(crate) pending_closures: Vec<(Pid, Closure)>,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
slots: Vec::new(),
|
||||
free_list: Vec::new(),
|
||||
run_queue: VecDeque::new(),
|
||||
root_pid: None,
|
||||
timers: Timers::new(),
|
||||
io: None,
|
||||
pending_closures: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn allocate_slot(&mut self) -> (u32, u32) {
|
||||
if let Some(idx) = self.free_list.pop() {
|
||||
let gen = self.slots[idx as usize].generation;
|
||||
(idx, gen)
|
||||
} else {
|
||||
let idx = self.slots.len() as u32;
|
||||
self.slots.push(Slot::vacant());
|
||||
(idx, 0)
|
||||
}
|
||||
}
|
||||
|
||||
fn slot(&self, pid: Pid) -> Option<&Slot> {
|
||||
let s = self.slots.get(pid.index() as usize)?;
|
||||
if s.generation == pid.generation() { Some(s) } else { None }
|
||||
}
|
||||
|
||||
fn slot_mut(&mut self, pid: Pid) -> Option<&mut Slot> {
|
||||
let s = self.slots.get_mut(pid.index() as usize)?;
|
||||
if s.generation == pid.generation() { Some(s) } else { None }
|
||||
}
|
||||
|
||||
fn pop_pending_closure(&mut self, pid: Pid) -> Option<Closure> {
|
||||
let pos = self.pending_closures.iter().position(|(p, _)| *p == pid)?;
|
||||
Some(self.pending_closures.swap_remove(pos).1)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// RuntimeInner — the shared core behind an Arc
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub(crate) struct RuntimeInner {
|
||||
pub(crate) shared: Mutex<SharedState>,
|
||||
/// Try-lock: exactly one scheduler thread drains timers/IO per iteration.
|
||||
drain_lock: Mutex<()>,
|
||||
/// Per-thread stats, indexed by scheduler thread slot (0..N).
|
||||
pub(crate) stats: Vec<SchedulerStats>,
|
||||
/// Global counters for RFC 000 primitives.
|
||||
pub(crate) io_parked: AtomicU32,
|
||||
pub(crate) sleeping: AtomicU32,
|
||||
}
|
||||
|
||||
impl RuntimeInner {
|
||||
fn new(thread_count: usize) -> Arc<Self> {
|
||||
let stats = (0..thread_count).map(|_| SchedulerStats::new()).collect();
|
||||
Arc::new(Self {
|
||||
shared: Mutex::new(SharedState::new()),
|
||||
drain_lock: Mutex::new(()),
|
||||
stats,
|
||||
io_parked: AtomicU32::new(0),
|
||||
sleeping: AtomicU32::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
fn with_shared<R>(&self, f: impl FnOnce(&mut SharedState) -> R) -> R {
|
||||
f(&mut self.shared.lock().unwrap())
|
||||
}
|
||||
|
||||
/// Returns `None` when the mutex is poisoned.
|
||||
/// Used in `unpark` / channel Drop which can fire after teardown.
|
||||
fn try_with_shared<R>(&self, f: impl FnOnce(&mut SharedState) -> R) -> Option<R> {
|
||||
match self.shared.lock() {
|
||||
Ok(mut g) => Some(f(&mut g)),
|
||||
Err(p) => Some(f(&mut p.into_inner())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Runtime — the public handle
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub struct Runtime {
|
||||
inner: Arc<RuntimeInner>,
|
||||
thread_count: usize,
|
||||
}
|
||||
|
||||
/// Initialise the runtime with the given config. Returns a reusable handle.
|
||||
pub fn init(config: Config) -> Runtime {
|
||||
let n = config.resolved_thread_count();
|
||||
Runtime {
|
||||
inner: RuntimeInner::new(n),
|
||||
thread_count: n,
|
||||
}
|
||||
}
|
||||
|
||||
impl Runtime {
|
||||
/// Run `f` as the initial actor, block until all actors finish.
|
||||
/// Can be called multiple times sequentially on the same `Runtime`.
|
||||
pub fn run(&self, f: impl FnOnce() + Send + 'static) {
|
||||
// Re-initialise shared state for this run.
|
||||
{
|
||||
let mut s = self.inner.shared.lock().unwrap();
|
||||
assert!(s.run_queue.is_empty(), "run() called while previous run still active");
|
||||
s.root_pid = Some(ROOT_PID);
|
||||
s.io = Some(IoThread::start().expect("failed to start IO thread"));
|
||||
}
|
||||
|
||||
// Spawn the initial actor through the public spawn path (which
|
||||
// requires a running runtime in the thread-local).
|
||||
RUNTIME.with(|r| *r.borrow_mut() = Some(self.inner.clone()));
|
||||
let initial_handle = crate::scheduler::spawn(f);
|
||||
|
||||
// Launch N-1 extra scheduler threads. The calling thread is thread 0.
|
||||
let mut os_threads = Vec::new();
|
||||
for slot in 1..self.thread_count {
|
||||
let inner = self.inner.clone();
|
||||
let t = thread::spawn(move || {
|
||||
RUNTIME.with(|r| *r.borrow_mut() = Some(inner.clone()));
|
||||
SCHED_SLOT.with(|s| s.set(slot));
|
||||
schedule_loop(&inner, slot);
|
||||
RUNTIME.with(|r| *r.borrow_mut() = None);
|
||||
});
|
||||
os_threads.push(t);
|
||||
}
|
||||
|
||||
// Thread 0 runs the loop on the calling thread.
|
||||
SCHED_SLOT.with(|s| s.set(0));
|
||||
schedule_loop(&self.inner, 0);
|
||||
|
||||
// Wait for all other scheduler threads.
|
||||
for t in os_threads {
|
||||
let _ = t.join();
|
||||
}
|
||||
|
||||
// Drop initial handle (decrements outstanding_handles count).
|
||||
drop(initial_handle);
|
||||
|
||||
// Tear down IO and clean up shared state for the next run() call.
|
||||
let mut s = self.inner.shared.lock().unwrap();
|
||||
drop(s.io.take()); // joins IO threads
|
||||
s.pending_closures.clear();
|
||||
// Reset per-thread stats.
|
||||
for stat in &self.inner.stats {
|
||||
stat.current_pid_index.store(u32::MAX, Ordering::Relaxed);
|
||||
stat.run_queue_len.store(0, Ordering::Relaxed);
|
||||
}
|
||||
self.inner.io_parked.store(0, Ordering::Relaxed);
|
||||
self.inner.sleeping.store(0, Ordering::Relaxed);
|
||||
|
||||
RUNTIME.with(|r| *r.borrow_mut() = None);
|
||||
}
|
||||
|
||||
/// Snapshot of runtime statistics for introspection / tests.
|
||||
pub fn stats(&self) -> RuntimeStats {
|
||||
RuntimeStats { inner: self.inner.clone() }
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Thread-locals
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
use std::cell::{Cell, RefCell};
|
||||
|
||||
thread_local! {
|
||||
/// The RuntimeInner for the current run(). Set by run() on the calling
|
||||
/// thread and by each spawned scheduler thread.
|
||||
pub(crate) static RUNTIME: RefCell<Option<Arc<RuntimeInner>>> =
|
||||
const { RefCell::new(None) };
|
||||
|
||||
/// This scheduler thread's index into RuntimeInner::stats.
|
||||
static SCHED_SLOT: Cell<usize> = const { Cell::new(0) };
|
||||
|
||||
/// What the actor wants when it yields back to the scheduler.
|
||||
static YIELD_INTENT: Cell<YieldIntent> = const { Cell::new(YieldIntent::Yield) };
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub(crate) enum YieldIntent { Yield, Park }
|
||||
|
||||
pub(crate) fn set_yield_intent(i: YieldIntent) {
|
||||
YIELD_INTENT.with(|c| c.set(i));
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Sentinel root PID
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub const ROOT_PID: Pid = Pid::new(u32::MAX, u32::MAX);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Slot reclamation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub(crate) fn reclaim_slot(s: &mut SharedState, pid: Pid) {
|
||||
let idx = pid.index();
|
||||
let slot = &mut s.slots[idx as usize];
|
||||
slot.generation = slot.generation.wrapping_add(1);
|
||||
slot.actor = None;
|
||||
slot.outcome = None;
|
||||
slot.waiters.clear();
|
||||
slot.supervisor_channel = None;
|
||||
slot.state = State::Done;
|
||||
slot.outstanding_handles = 0;
|
||||
slot.pending_io_result = None;
|
||||
s.free_list.push(idx);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// finalize_actor
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn finalize_actor(inner: &Arc<RuntimeInner>, pid: Pid, outcome: Outcome) {
|
||||
let (joiner_outcome, sup_signal) = match outcome {
|
||||
Outcome::Exit => (Outcome::Exit, Signal::Exit(pid)),
|
||||
Outcome::Panic(payload) => (
|
||||
Outcome::Panic(payload),
|
||||
Signal::Panic(pid, Box::new(()) as Box<dyn std::any::Any + Send>),
|
||||
),
|
||||
};
|
||||
|
||||
let (waiters, supervisor_pid) = inner.with_shared(|s| {
|
||||
let slot = s.slot_mut(pid).expect("finalize_actor: slot vanished");
|
||||
let sup = slot.actor.as_ref().map(|a| a.supervisor);
|
||||
slot.outcome = Some(joiner_outcome);
|
||||
slot.state = State::Done;
|
||||
slot.actor = None;
|
||||
(std::mem::take(&mut slot.waiters), sup)
|
||||
});
|
||||
|
||||
// Deliver to supervisor.
|
||||
if let Some(sup) = supervisor_pid {
|
||||
let sender = inner.with_shared(|s| {
|
||||
s.slot(sup).and_then(|slot| slot.supervisor_channel.clone())
|
||||
});
|
||||
if let Some(sender) = sender {
|
||||
let _ = sender.send(sup_signal);
|
||||
}
|
||||
}
|
||||
|
||||
// Unpark joiners.
|
||||
for joiner in waiters {
|
||||
crate::scheduler::unpark(joiner);
|
||||
}
|
||||
|
||||
// Reclaim if no outstanding handles.
|
||||
inner.with_shared(|s| {
|
||||
let reclaim = s.slot(pid).map(|slot| slot.outstanding_handles == 0).unwrap_or(false);
|
||||
if reclaim { reclaim_slot(s, pid); }
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// schedule_loop — runs on each scheduler OS thread
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn schedule_loop(inner: &Arc<RuntimeInner>, slot: usize) {
|
||||
let stats = &inner.stats[slot];
|
||||
|
||||
loop {
|
||||
// ----------------------------------------------------------------
|
||||
// 1. Try to win the drain lock (timers + IO). One winner per round;
|
||||
// losers skip immediately and proceed to step 2.
|
||||
// ----------------------------------------------------------------
|
||||
if let Ok(_drain_guard) = inner.drain_lock.try_lock() {
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
// Drain due timers.
|
||||
let due = inner.with_shared(|s| s.timers.pop_due(now));
|
||||
for entry in due {
|
||||
match entry.reason {
|
||||
crate::timer::Reason::Sleep => {
|
||||
inner.with_shared(|s| {
|
||||
if let Some(slot) = s.slot_mut(entry.pid) {
|
||||
if matches!(slot.state, State::Parked) {
|
||||
slot.state = State::Runnable;
|
||||
s.run_queue.push_back(entry.pid);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
crate::timer::Reason::WaitTimeout { target, wait_seq } => {
|
||||
// Runs outside with_shared — the callback may call unpark.
|
||||
target.on_timeout(entry.pid, wait_seq);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Drain IO completions.
|
||||
let completions = inner.with_shared(|s| {
|
||||
s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default()
|
||||
});
|
||||
for completion in completions {
|
||||
match completion {
|
||||
crate::io::Completion::Blocking { pid, result } => {
|
||||
inner.with_shared(|s| {
|
||||
if let Some(io) = s.io.as_mut() {
|
||||
io.outstanding = io.outstanding.saturating_sub(1);
|
||||
}
|
||||
if let Some(slot) = s.slot_mut(pid) {
|
||||
slot.pending_io_result = Some(result);
|
||||
if matches!(slot.state, State::Parked) {
|
||||
slot.state = State::Runnable;
|
||||
s.run_queue.push_back(pid);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
crate::io::Completion::FdReady { fd, events: _ } => {
|
||||
inner.with_shared(|s| {
|
||||
let parked_pid = s.io.as_mut().and_then(|io| {
|
||||
let pid = io.waiters.remove(&fd);
|
||||
io.epoll_deregister(fd);
|
||||
pid
|
||||
});
|
||||
if let Some(pid) = parked_pid {
|
||||
if let Some(slot) = s.slot_mut(pid) {
|
||||
if matches!(slot.state, State::Parked) {
|
||||
slot.state = State::Runnable;
|
||||
s.run_queue.push_back(pid);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
} // drain_guard drops here
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// 2. Pop a runnable actor from the shared queue.
|
||||
// ----------------------------------------------------------------
|
||||
let pid = match inner.with_shared(|s| {
|
||||
let len = s.run_queue.len() as u64;
|
||||
stats.run_queue_len.store(len, Ordering::Relaxed);
|
||||
s.run_queue.pop_front()
|
||||
}) {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
// Nothing runnable. Check whether we should wait or exit.
|
||||
let (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) =
|
||||
inner.with_shared(|s| {
|
||||
let next = s.timers.peek_deadline();
|
||||
let (out, fd) = match s.io.as_ref() {
|
||||
Some(io) => (
|
||||
io.outstanding + io.waiters.len() as u32,
|
||||
Some(io.wake_fd()),
|
||||
),
|
||||
None => (0, None),
|
||||
};
|
||||
// Count actors that are not Done (Runnable or Parked).
|
||||
let live = s.slots.iter().filter(|slot| {
|
||||
slot.actor.is_some()
|
||||
}).count();
|
||||
(next, out, fd, s.run_queue.is_empty(), live)
|
||||
});
|
||||
|
||||
match (next_deadline, io_outstanding, wake_fd, queue_empty, live_actors) {
|
||||
// Queue is now non-empty (another thread added work): retry.
|
||||
(_, _, _, false, _) => continue,
|
||||
// Truly idle — no timers, no IO, no live actors.
|
||||
(None, 0, _, true, 0) => return,
|
||||
// Live actors but queue empty: they must be parked on IO or
|
||||
// timers. Wait on the appropriate source.
|
||||
(Some(deadline), _, fd_opt, true, _) => {
|
||||
let now = std::time::Instant::now();
|
||||
if deadline > now {
|
||||
let timeout = deadline - now;
|
||||
match fd_opt {
|
||||
Some(fd) => {
|
||||
crate::io::poll_wake(fd, Some(timeout));
|
||||
crate::io::drain_wake_pipe(fd);
|
||||
}
|
||||
None => thread::sleep(timeout),
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
(None, _, Some(fd), true, _) => {
|
||||
crate::io::poll_wake(fd, None);
|
||||
crate::io::drain_wake_pipe(fd);
|
||||
continue;
|
||||
}
|
||||
// Live actors, queue empty, no IO/timers: they're parked
|
||||
// waiting for each other (potential deadlock in user code),
|
||||
// or another thread is about to add work. Sleep briefly to
|
||||
// avoid hammering the shared mutex.
|
||||
_ => {
|
||||
thread::sleep(std::time::Duration::from_micros(100));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// 3. Resume the actor.
|
||||
// ----------------------------------------------------------------
|
||||
let sp = match inner.with_shared(|s| {
|
||||
s.slot(pid).and_then(|slot| slot.actor.as_ref().map(|a| a.sp))
|
||||
}) {
|
||||
Some(sp) => sp,
|
||||
None => continue, // stale pid
|
||||
};
|
||||
|
||||
// First resume: move the closure into the trampoline's thread-local.
|
||||
if let Some(b) = inner.with_shared(|s| s.pop_pending_closure(pid)) {
|
||||
set_current_actor_box(b);
|
||||
}
|
||||
|
||||
// Update per-thread stats: record who's on-CPU.
|
||||
stats.current_pid_index.store(pid.index(), Ordering::Relaxed);
|
||||
|
||||
set_actor_sp(sp);
|
||||
set_current_pid(pid);
|
||||
reset_actor_done();
|
||||
YIELD_INTENT.with(|c| c.set(YieldIntent::Yield));
|
||||
crate::preempt::reset_timeslice();
|
||||
PREEMPTION_ENABLED.with(|c| c.set(true));
|
||||
|
||||
unsafe { switch_to_actor() };
|
||||
|
||||
PREEMPTION_ENABLED.with(|c| c.set(false));
|
||||
stats.current_pid_index.store(u32::MAX, Ordering::Relaxed);
|
||||
clear_current_pid();
|
||||
|
||||
let intent = YIELD_INTENT.with(|c| c.get());
|
||||
let new_sp = get_actor_sp();
|
||||
|
||||
if is_actor_done() {
|
||||
let outcome = take_last_outcome().unwrap_or(Outcome::Exit);
|
||||
finalize_actor(inner, pid, outcome);
|
||||
} else {
|
||||
inner.with_shared(|s| {
|
||||
if let Some(slot) = s.slot_mut(pid) {
|
||||
if let Some(actor) = slot.actor.as_mut() {
|
||||
actor.sp = new_sp;
|
||||
}
|
||||
match intent {
|
||||
YieldIntent::Yield => {
|
||||
slot.state = State::Runnable;
|
||||
s.run_queue.push_back(pid);
|
||||
}
|
||||
YieldIntent::Park => {
|
||||
slot.state = State::Parked;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user