//! Unbounded MPSC channels. //! //! Single-threaded scheduler: the inner state is `Rc>>`, //! not `Arc`. We hand-implement `Send` for `Sender` and //! `Receiver` when `T: Send`, on the basis that the only way two actor //! contexts touch the same channel is by being scheduled on the *same* OS //! thread (v0.1 has exactly one). When we add a second scheduler thread, //! this lie must be retired: replace `Rc` with `Arc` (or a //! lock-free queue) and remove the unsafe Send impls. //! //! Semantics: //! - Senders are clonable; the last sender drop closes the channel. //! - `Receiver::recv` on an empty open channel parks the receiver. //! - `Receiver::recv` on an empty closed channel returns `Err(RecvError)`. //! - `Sender::send` on an open channel always succeeds. //! - `Sender::send` on a closed channel (receiver dropped) returns //! `Err(SendError(value))`. //! - When a send pushes to a previously empty queue and a receiver is //! parked, the receiver is unparked. use crate::pid::Pid; use std::cell::RefCell; use std::collections::VecDeque; use std::rc::Rc; pub fn channel() -> (Sender, Receiver) { let inner = Rc::new(RefCell::new(Inner { queue: VecDeque::new(), parked_receiver: None, senders: 1, receiver_alive: true, })); (Sender { inner: inner.clone() }, Receiver { inner }) } struct Inner { queue: VecDeque, parked_receiver: Option, senders: usize, receiver_alive: bool, } pub struct Sender { inner: Rc>>, } pub struct Receiver { inner: Rc>>, } // SAFETY (v0.1 only): the scheduler is single-threaded. Sender/Receiver can // be captured into actor closures (which require Send), but they will only // ever be touched from one OS thread. When multi-threading lands, swap the // `Rc` for `Arc` and remove these. unsafe impl Send for Sender {} unsafe impl Send for Receiver {} #[derive(Debug, PartialEq, Eq)] pub struct SendError(pub T); #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub struct RecvError; impl std::fmt::Display for RecvError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "channel closed") } } impl std::error::Error for RecvError {} impl Clone for Sender { fn clone(&self) -> Self { self.inner.borrow_mut().senders += 1; Sender { inner: self.inner.clone() } } } impl Drop for Sender { fn drop(&mut self) { let unpark = { let mut g = self.inner.borrow_mut(); g.senders -= 1; if g.senders == 0 && g.queue.is_empty() { // Channel closed and drained. Wake the receiver so it can // see RecvError. g.parked_receiver.take() } else { None } }; if let Some(pid) = unpark { crate::scheduler::unpark(pid); } } } impl Drop for Receiver { fn drop(&mut self) { self.inner.borrow_mut().receiver_alive = false; } } impl Sender { pub fn send(&self, value: T) -> Result<(), SendError> { let unpark = { let mut g = self.inner.borrow_mut(); if !g.receiver_alive { return Err(SendError(value)); } g.queue.push_back(value); // If the receiver is parked, unpark it. g.parked_receiver.take() }; if let Some(pid) = unpark { crate::scheduler::unpark(pid); } Ok(()) } } impl Receiver { pub fn recv(&self) -> Result { loop { // Try to take a message. { let mut g = self.inner.borrow_mut(); if let Some(v) = g.queue.pop_front() { return Ok(v); } if g.senders == 0 { return Err(RecvError); } // Empty + open: register and park. let me = crate::actor::current_pid() .expect("recv() called outside an actor"); debug_assert!( g.parked_receiver.is_none(), "channel has more than one receiver" ); g.parked_receiver = Some(me); } // Release the borrow before parking — the unparker will need it. crate::scheduler::park_current(); // Loop: the message that woke us might already have been taken // (it can't, with one receiver, but the senders=0 path can fire // here too). } } /// Non-blocking. `Ok(Some(v))` if a message was available, `Ok(None)` if /// the channel is empty but open, `Err(RecvError)` if closed and drained. pub fn try_recv(&self) -> Result, RecvError> { let mut g = self.inner.borrow_mut(); if let Some(v) = g.queue.pop_front() { return Ok(Some(v)); } if g.senders == 0 { return Err(RecvError); } Ok(None) } }