//! Actor-aware mutex with mandatory timeout. //! //! `Mutex` parks the calling *green* thread on contention rather than //! blocking the OS thread. Every lock attempt is bounded by a timeout. //! //! Internals use `Arc>` so the type is genuinely //! `Send + Sync` and can be shared across scheduler threads. //! //! Fairness: FIFO. Poisoning: none. Reentrance: deadlock (caller bug). use crate::pid::Pid; use crate::scheduler; use crate::timer::{self, TimerTarget}; use std::collections::VecDeque; use std::sync::{Arc, Mutex as StdMutex}; use std::time::Duration; pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub struct LockTimeout; impl std::fmt::Display for LockTimeout { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "mutex lock timed out") } } impl std::error::Error for LockTimeout {} // --------------------------------------------------------------------------- // Internals // --------------------------------------------------------------------------- struct Wait { pid: Pid, seq: u64, } struct MutexState { holder: Option, waiters: VecDeque, next_seq: u64, default_timeout: Duration, } struct MutexCore { state: StdMutex, } impl MutexCore { fn new(default_timeout: Duration) -> Self { Self { state: StdMutex::new(MutexState { holder: None, waiters: VecDeque::new(), next_seq: 0, default_timeout, }), } } } impl TimerTarget for MutexCore { fn on_timeout(&self, pid: Pid, wait_seq: u64) { let unpark = { let mut st = self.state.lock().unwrap(); // Remove from waiters only if still there with matching seq. // If the lock was already granted (holder == Some(pid)), the // timer fired after the grant — treat as no-op; the actor // will see `is_holder == true` and return Ok. if st.holder == Some(pid) { return; } let pos = st.waiters.iter().position(|w| w.pid == pid && w.seq == wait_seq); if pos.is_some() { st.waiters.remove(pos.unwrap()); true } else { false } }; if unpark { scheduler::unpark(pid); } } } // --------------------------------------------------------------------------- // Public API // --------------------------------------------------------------------------- pub struct Mutex { core: Arc, /// Protected value. `None` while a guard is live; `Some` while free. value: Arc>>, } impl Mutex { pub fn new(value: T) -> Self { Self { core: Arc::new(MutexCore::new(DEFAULT_TIMEOUT)), value: Arc::new(StdMutex::new(Some(value))), } } pub fn set_default_timeout(&self, timeout: Duration) { self.core.state.lock().unwrap().default_timeout = timeout; } pub fn lock(&self) -> Result, LockTimeout> { let timeout = self.core.state.lock().unwrap().default_timeout; self.lock_timeout(timeout) } pub fn lock_timeout(&self, timeout: Duration) -> Result, LockTimeout> { // Outside the runtime (e.g. in tests, after run() returns) there is no // current actor PID. Fall back to a blocking std::sync::Mutex acquire. let Some(me) = crate::actor::current_pid() else { return self.lock_blocking(); }; // Fast path: nobody holds it. { let mut st = self.core.state.lock().unwrap(); if st.holder.is_none() { st.holder = Some(me); drop(st); let value = self.value.lock().unwrap().take() .expect("Mutex: value missing on free fast path"); return Ok(MutexGuard { mutex: self, value: Some(value) }); } } // Slow path: register as a waiter, set timeout, park. let _np = scheduler::NoPreempt::enter(); let seq = { let mut st = self.core.state.lock().unwrap(); let seq = st.next_seq; st.next_seq = st.next_seq.wrapping_add(1); st.waiters.push_back(Wait { pid: me, seq }); seq }; let target: Arc = self.core.clone(); let deadline = timer::deadline_from_now(timeout); scheduler::insert_wait_timer(deadline, me, target, seq); scheduler::park_current(); // Resumed. Are we the holder? let is_holder = self.core.state.lock().unwrap().holder == Some(me); if is_holder { let value = self.value.lock().unwrap().take() .expect("Mutex: value missing after grant"); Ok(MutexGuard { mutex: self, value: Some(value) }) } else { Err(LockTimeout) } } pub fn try_lock(&self) -> Option> { let me = crate::actor::current_pid()?; let mut st = self.core.state.lock().unwrap(); if st.holder.is_some() { return None; } st.holder = Some(me); drop(st); let value = self.value.lock().unwrap().take() .expect("Mutex: value missing on try_lock free path"); Some(MutexGuard { mutex: self, value: Some(value) }) } /// Blocking fallback used when called outside the smarm runtime. /// Spins on the internal std mutex; no actor parking, no timeout. fn lock_blocking(&self) -> Result, LockTimeout> { // We have no PID to register as holder, so we bypass the holder/waiter // tracking and just grab the value mutex directly. This is safe because // outside the runtime there are no green threads competing. let value = loop { let v = self.value.lock().unwrap().take(); if let Some(v) = v { break v; } std::thread::yield_now(); }; Ok(MutexGuard { mutex: self, value: Some(value) }) } } impl Clone for Mutex { fn clone(&self) -> Self { Self { core: self.core.clone(), value: self.value.clone() } } } // Genuinely Send + Sync now that internals are Arc>. unsafe impl Send for Mutex {} unsafe impl Sync for Mutex {} // --------------------------------------------------------------------------- // Guard // --------------------------------------------------------------------------- pub struct MutexGuard<'a, T> { mutex: &'a Mutex, value: Option, } impl std::ops::Deref for MutexGuard<'_, T> { type Target = T; fn deref(&self) -> &T { self.value.as_ref().expect("MutexGuard: value missing") } } impl std::ops::DerefMut for MutexGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { self.value.as_mut().expect("MutexGuard: value missing") } } impl std::fmt::Debug for MutexGuard<'_, T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_tuple("MutexGuard") .field(self.value.as_ref().expect("MutexGuard: value missing")) .finish() } } impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { let v = self.value.take().expect("MutexGuard: double drop"); *self.mutex.value.lock().unwrap() = Some(v); let next_pid = { let mut st = self.mutex.core.state.lock().unwrap(); match st.waiters.pop_front() { Some(w) => { st.holder = Some(w.pid); Some(w.pid) } None => { st.holder = None; None } } }; if let Some(pid) = next_pid { scheduler::unpark(pid); } } }