diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..4286486 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,237 @@ +//! Off-scheduler blocking work. +//! +//! `block_on_io(closure)` runs `closure` on a dedicated worker OS thread, +//! parks the calling actor in the meantime, and returns the closure's +//! value when it completes. Lets actors call into blocking C libraries, +//! synchronous file IO, or anything else that would otherwise stall the +//! scheduler thread. +//! +//! Architecture +//! ============ +//! Per `run()`: +//! - one worker OS thread, started by `run()` and joined at shutdown; +//! - a request channel (`mpsc::Sender`) from scheduler → worker; +//! - a completion queue (`Mutex>`) worker → scheduler; +//! - a wake pipe: when the worker pushes a completion it writes one byte +//! to the pipe; the scheduler polls the pipe (with timeout) when it +//! would otherwise be idle. +//! +//! For v0.2 the worker is a single thread, so concurrent `block_on_io` +//! calls are serialised. v0.3 can replace it with a thread pool behind +//! the same request channel. +//! +//! Panic handling +//! ============== +//! The worker runs the closure inside `catch_unwind` and ships either the +//! return value or the panic payload back to the scheduler. `block_on_io` +//! resumes the panic on the calling actor's stack, so the actor's +//! supervisor sees a real `Signal::Panic` as if the work had run inline. + +use crate::pid::Pid; +use std::any::Any; +use std::collections::VecDeque; +use std::io; +use std::os::fd::RawFd; +use std::panic; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle as OsJoinHandle; + +// --------------------------------------------------------------------------- +// Wire types +// --------------------------------------------------------------------------- + +/// What the worker stores while computing a result. `Ok` is the closure's +/// return value (boxed as `Any`); `Err` is the panic payload. +pub type IoResult = Result, Box>; + +struct Request { + pid: Pid, + /// The work to perform. Returns the wire-form result directly. + work: Box IoResult + Send>, +} + +struct Completion { + pid: Pid, + result: IoResult, +} + +// --------------------------------------------------------------------------- +// IoThread — created per `run()`, owned by `SchedulerState`. +// --------------------------------------------------------------------------- + +pub struct IoThread { + /// Channel into the worker. + tx: mpsc::Sender, + /// Shared completion queue. The worker pushes; the scheduler drains. + completions: Arc>>, + /// Pipe used as a one-bit wakeup. `wake_read` is what the scheduler + /// polls; `wake_write` is what the worker writes to. + wake_read: RawFd, + wake_write: RawFd, + /// Worker thread handle, joined on shutdown. + worker: Option>, + /// Number of requests in-flight (sent but not yet drained as a + /// completion). Used by the scheduler's idle path to decide whether + /// to wait on the pipe or exit. + pub outstanding: u32, +} + +impl IoThread { + pub fn start() -> io::Result { + let (wake_read, wake_write) = make_pipe()?; + let (tx, rx) = mpsc::channel::(); + let completions: Arc>> = + Arc::new(Mutex::new(VecDeque::new())); + + let comps_worker = completions.clone(); + let worker = std::thread::Builder::new() + .name("smarm-io".into()) + .spawn(move || worker_loop(rx, comps_worker, wake_write))?; + + Ok(Self { + tx, + completions, + wake_read, + wake_write, + worker: Some(worker), + outstanding: 0, + }) + } + + /// Hand a request to the worker. Increments `outstanding`. + pub fn submit(&mut self, pid: Pid, work: Box IoResult + Send>) { + self.outstanding += 1; + // Send can only fail if the worker has hung up, which only happens + // on shutdown. submit during shutdown is a bug. + self.tx + .send(Request { pid, work }) + .expect("io worker hung up unexpectedly"); + } + + /// Drain every available completion. Caller is responsible for + /// decrementing `outstanding` and routing the results. + pub fn drain_completions(&mut self) -> Vec<(Pid, IoResult)> { + let mut q = self.completions.lock().unwrap(); + let mut out = Vec::with_capacity(q.len()); + while let Some(c) = q.pop_front() { + out.push((c.pid, c.result)); + } + out + } + + pub fn wake_fd(&self) -> RawFd { + self.wake_read + } +} + +impl Drop for IoThread { + fn drop(&mut self) { + // Hang up the request channel; the worker will exit its loop. + // We must drop `tx` before joining. Take it out by moving. + // mpsc::Sender doesn't have explicit `disconnect`; dropping it + // (after this scope) causes the receiver to return Err. + // + // Trick: replace self.tx with a fresh dead one so we can drop it. + let (dead_tx, _) = mpsc::channel::(); + let real_tx = std::mem::replace(&mut self.tx, dead_tx); + drop(real_tx); + + if let Some(h) = self.worker.take() { + // Best-effort join. If the worker panicked, ignore. + let _ = h.join(); + } + + // Close the pipe. + unsafe { + libc::close(self.wake_read); + libc::close(self.wake_write); + } + } +} + +// --------------------------------------------------------------------------- +// Worker loop +// --------------------------------------------------------------------------- + +fn worker_loop( + rx: mpsc::Receiver, + completions: Arc>>, + wake_write: RawFd, +) { + while let Ok(Request { pid, work }) = rx.recv() { + let result: IoResult = match panic::catch_unwind(panic::AssertUnwindSafe(work)) { + Ok(r) => r, + Err(payload) => Err(payload), + }; + completions.lock().unwrap().push_back(Completion { pid, result }); + // Write one byte to the pipe to wake the scheduler. If the pipe + // buffer is full (scheduler isn't draining), the write may return + // EAGAIN — we'll ignore it because there's already an outstanding + // wakeup that hasn't been consumed yet. + let buf: [u8; 1] = [0]; + unsafe { + // EINTR is the only retryable case worth handling. + loop { + let n = libc::write(wake_write, buf.as_ptr() as *const _, 1); + if n < 0 { + let e = *libc::__errno_location(); + if e == libc::EINTR { continue; } + } + break; + } + } + } +} + +// --------------------------------------------------------------------------- +// Pipe helpers +// --------------------------------------------------------------------------- + +fn make_pipe() -> io::Result<(RawFd, RawFd)> { + let mut fds: [libc::c_int; 2] = [0; 2]; + // O_CLOEXEC so children don't inherit, O_NONBLOCK on the read side + // so the scheduler's drain can `read` without blocking. + let r = unsafe { + libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) + }; + if r != 0 { + return Err(io::Error::last_os_error()); + } + Ok((fds[0], fds[1])) +} + +/// Drain pending bytes from the wake pipe. The scheduler calls this after +/// a `poll` wakeup so the next idle call sees an empty pipe. +pub fn drain_wake_pipe(fd: RawFd) { + let mut buf = [0u8; 64]; + loop { + let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) }; + if n <= 0 { + // EAGAIN (would block) or EOF — done. + break; + } + } +} + +/// Block on `fd` for up to `timeout`, returning when either there's data +/// to read or the timeout elapses. `None` for `timeout` means wait forever. +pub fn poll_wake(fd: RawFd, timeout: Option) { + let timeout_ms: libc::c_int = match timeout { + None => -1, + Some(d) => { + // Cap at i32::MAX milliseconds; poll's argument is c_int. + let ms = d.as_millis(); + if ms > i32::MAX as u128 { i32::MAX } else { ms as i32 } + } + }; + let mut pfd = libc::pollfd { fd, events: libc::POLLIN, revents: 0 }; + loop { + let r = unsafe { libc::poll(&mut pfd as *mut _, 1, timeout_ms) }; + if r < 0 { + let e = unsafe { *libc::__errno_location() }; + if e == libc::EINTR { continue; } + } + break; + } +} diff --git a/src/lib.rs b/src/lib.rs index 17639e1..33896c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ pub mod channel; pub mod scheduler; pub mod supervisor; pub mod timer; +pub mod io; // --------------------------------------------------------------------------- // Global allocator @@ -37,5 +38,7 @@ static ALLOCATOR: preempt::PreemptingAllocator = preempt::PreemptingAllocator; pub use channel::{channel, Receiver, RecvError, Sender}; pub use pid::Pid; -pub use scheduler::{run, self_pid, sleep, spawn, spawn_under, yield_now, JoinError, JoinHandle}; +pub use scheduler::{ + block_on_io, run, self_pid, sleep, spawn, spawn_under, yield_now, JoinError, JoinHandle, +}; pub use supervisor::Signal; diff --git a/src/scheduler.rs b/src/scheduler.rs index 7ffd391..3bd0213 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -73,6 +73,10 @@ struct Slot { /// Number of `JoinHandle`s still outstanding for this actor. The slot /// is reclaimed only when the actor is done AND outstanding_handles == 0. outstanding_handles: u32, + /// One-shot mailbox for the result of an in-flight `block_on_io` call. + /// The scheduler writes it on completion; `block_on_io` reads it on + /// resume. + pending_io_result: Option, } impl Slot { @@ -85,6 +89,7 @@ impl Slot { outcome: None, supervisor_channel: None, outstanding_handles: 0, + pending_io_result: None, } } } @@ -102,6 +107,8 @@ struct SchedulerState { root_pid: Option, /// Pending sleep timers. Min-heap keyed by deadline. timers: crate::timer::Timers, + /// IO worker thread. `None` outside `run()`. + io: Option, } impl SchedulerState { @@ -112,6 +119,7 @@ impl SchedulerState { run_queue: VecDeque::new(), root_pid: None, timers: crate::timer::Timers::new(), + io: None, } } @@ -251,6 +259,7 @@ fn reclaim_slot(s: &mut SchedulerState, pid: Pid) { slot.supervisor_channel = None; slot.state = State::Done; // semantically vacant; allocator checks free_list slot.outstanding_handles = 0; + slot.pending_io_result = None; s.free_list.push(idx); } @@ -284,6 +293,7 @@ pub fn spawn_under(supervisor: Pid, f: impl FnOnce() + Send + 'static) -> JoinHa slot.outcome = None; slot.waiters.clear(); slot.supervisor_channel = None; + slot.pending_io_result = None; s.run_queue.push_back(pid); pid }); @@ -344,6 +354,53 @@ pub fn sleep(duration: std::time::Duration) { park_current(); } +/// Run `f` on the IO worker thread, park the current actor while it runs, +/// and return `f`'s value when it completes. Panics inside `f` propagate +/// to the calling actor. +/// +/// Use this for blocking calls that would otherwise stall the scheduler — +/// synchronous file IO, blocking C FFI, libpq, etc. +pub fn block_on_io(f: F) -> T +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let me = current_pid().expect("block_on_io() called outside an actor"); + + // Box the user closure into the wire-form result-shaped closure that + // the worker expects. The worker also wraps in catch_unwind, but doing + // it here too would let us downcast `T` only when the closure didn't + // panic. We let the worker handle catch_unwind so the boxing here + // stays straightforward. + let work: Box crate::io::IoResult + Send> = Box::new(move || { + let v: T = f(); + Ok(Box::new(v) as Box) + }); + + with_sched(|s| { + let io = s.io.as_mut().expect("io thread not started"); + io.submit(me, work); + }); + + park_current(); + + // On resume, our slot has a result waiting. + let result = with_sched(|s| { + s.slot_mut(me) + .expect("block_on_io: own slot vanished") + .pending_io_result + .take() + .expect("block_on_io: resumed without a result") + }); + + match result { + Ok(any) => *any + .downcast::() + .expect("block_on_io: result type mismatch — should be unreachable"), + Err(payload) => std::panic::resume_unwind(payload), + } +} + /// Wake a parked actor. If the actor isn't parked (already runnable or done) /// this is a no-op — that's important; channel and join can both fire /// spurious unparks under some orderings and we want them to be cheap. @@ -405,6 +462,7 @@ pub fn run(initial: F) { assert!(c.borrow().is_none(), "smarm::run() called recursively"); let mut state = SchedulerState::new(); state.root_pid = Some(ROOT_PID); + state.io = Some(crate::io::IoThread::start().expect("failed to start io thread")); *c.borrow_mut() = Some(state); }); @@ -445,24 +503,76 @@ fn schedule_loop() { }); } - // 2. Pop a runnable actor. If none, sleep on the soonest timer or - // exit if there isn't one. + // 2. Drain IO completions: route each result to its slot and + // unpark the actor. Drain even when we have other runnables — + // it's cheap (a try_lock of the completion queue) and keeps + // pending_io_result freshness bounded. + let completions = with_sched(|s| { + s.io.as_mut().map(|io| io.drain_completions()).unwrap_or_default() + }); + for (pid, result) in completions { + with_sched(|s| { + if let Some(io) = s.io.as_mut() { + io.outstanding = io.outstanding.saturating_sub(1); + } + if let Some(slot) = s.slot_mut(pid) { + slot.pending_io_result = Some(result); + if matches!(slot.state, State::Parked) { + slot.state = State::Runnable; + s.run_queue.push_back(pid); + } + } + }); + } + + // 3. Pop a runnable actor. If none, decide whether to block on + // the wake pipe (for timers or IO) or exit (nothing pending). let pid = match with_sched(|s| s.run_queue.pop_front()) { Some(p) => p, None => { - let next = with_sched(|s| s.timers.peek_deadline()); - match next { - Some(deadline) => { + // Read out what we'd need to block on. We must take the + // wake fd separately because we can't hold an SCHED + // borrow across `poll_wake` — the IO thread will be + // trying to take the completions mutex, which is fine, + // but the scheduler thread itself mustn't hold SCHED + // borrowed across a blocking syscall. + let (next_deadline, io_outstanding, wake_fd) = with_sched(|s| { + let next = s.timers.peek_deadline(); + let (out, fd) = match s.io.as_ref() { + Some(io) => (io.outstanding, Some(io.wake_fd())), + None => (0, None), + }; + (next, out, fd) + }); + + match (next_deadline, io_outstanding, wake_fd) { + // Nothing pending — we're done. + (None, 0, _) | (None, _, None) => return, + // Timer pending, nothing else: poll with a deadline, + // or fall back to plain sleep if we somehow have no + // wake fd (shouldn't happen — io thread is always up + // during run()). + (Some(deadline), _, fd_opt) => { let now = std::time::Instant::now(); if deadline > now { - // No other thread can wake us; plain sleep is - // correct. When the IO thread lands in v0.2 - // this becomes a Condvar / pipe wakeup. - std::thread::sleep(deadline - now); + let timeout = deadline - now; + match fd_opt { + Some(fd) => { + crate::io::poll_wake(fd, Some(timeout)); + crate::io::drain_wake_pipe(fd); + } + None => std::thread::sleep(timeout), + } } continue; } - None => return, // no runnables, no timers — done. + // No timer, but IO outstanding: poll forever for the + // pipe wakeup. + (None, _, Some(fd)) => { + crate::io::poll_wake(fd, None); + crate::io::drain_wake_pipe(fd); + continue; + } } } }; diff --git a/tests/io.rs b/tests/io.rs new file mode 100644 index 0000000..820ee66 --- /dev/null +++ b/tests/io.rs @@ -0,0 +1,99 @@ +//! Tests for `block_on_io` — running a blocking closure on a worker OS +//! thread while the calling actor is parked. + +use smarm::{block_on_io, run, spawn, yield_now}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +#[test] +fn block_on_io_returns_the_closures_value() { + let captured: Arc>> = Arc::new(Mutex::new(None)); + let c = captured.clone(); + run(move || { + let v: u64 = block_on_io(|| { + // Burn a tiny bit of time so this actually crosses thread. + std::thread::sleep(Duration::from_millis(5)); + 42 + }); + *c.lock().unwrap() = Some(v); + }); + assert_eq!(*captured.lock().unwrap(), Some(42)); +} + +#[test] +fn other_actors_run_while_block_on_io_is_in_flight() { + // While actor A is parked in block_on_io, actor B should be able to + // make progress. + let order: Arc>> = Arc::new(Mutex::new(Vec::new())); + let oa = order.clone(); + let ob = order.clone(); + + run(move || { + let a = spawn(move || { + oa.lock().unwrap().push(1); // A starts first. + block_on_io(|| { + std::thread::sleep(Duration::from_millis(50)); + }); + oa.lock().unwrap().push(4); // A resumes last. + }); + let b = spawn(move || { + // Make sure A enters block_on_io first. + yield_now(); + ob.lock().unwrap().push(2); + yield_now(); + ob.lock().unwrap().push(3); + }); + a.join().unwrap(); + b.join().unwrap(); + }); + + // Required interleaving: 1 (A starts) before 2,3 (B runs while A + // is parked), and 4 (A resumes) after 2,3. + let v = order.lock().unwrap(); + assert_eq!(v[0], 1, "log: {:?}", *v); + assert_eq!(v[v.len() - 1], 4, "log: {:?}", *v); + let pos_2 = v.iter().position(|&x| x == 2).unwrap(); + let pos_3 = v.iter().position(|&x| x == 3).unwrap(); + let pos_4 = v.iter().position(|&x| x == 4).unwrap(); + assert!(pos_2 < pos_4, "B's first step ran after A resumed: {:?}", *v); + assert!(pos_3 < pos_4, "B's second step ran after A resumed: {:?}", *v); +} + +#[test] +fn many_concurrent_block_on_io_calls_all_complete() { + let counter = Arc::new(AtomicU32::new(0)); + let c = counter.clone(); + run(move || { + let mut handles = Vec::new(); + for _ in 0..10 { + let cc = c.clone(); + handles.push(spawn(move || { + let n: u32 = block_on_io(|| { + std::thread::sleep(Duration::from_millis(10)); + 1 + }); + cc.fetch_add(n, Ordering::SeqCst); + })); + } + for h in handles { h.join().unwrap(); } + }); + assert_eq!(counter.load(Ordering::SeqCst), 10); +} + +#[test] +fn block_on_io_panic_propagates_to_caller() { + let saw_err = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let s = saw_err.clone(); + run(move || { + let h = spawn(move || { + // The closure panics on the worker thread; that should + // resurface as a panic in this actor. + let _: () = block_on_io(|| panic!("boom on io thread")); + }); + if h.join().is_err() { + s.store(true, Ordering::SeqCst); + } + }); + assert!(saw_err.load(Ordering::SeqCst)); +}