//! Tests for epoll-based fd readiness primitives: `wait_readable`, //! `wait_writable`, and the `read`/`write` sugar on top of them. //! //! Pipes are the convenient test target: cheap to create, easy to drive, //! and we already use `libc::pipe2` internally. Each pipe is one direction //! and respects `O_NONBLOCK` if we ask for it. use smarm::{run, spawn, wait_readable, wait_writable, yield_now}; use std::os::fd::RawFd; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::time::Duration; // --------------------------------------------------------------------------- // Pipe helper // --------------------------------------------------------------------------- struct Pipe { read: RawFd, write: RawFd, } impl Pipe { fn new() -> Self { let mut fds: [libc::c_int; 2] = [0; 2]; let r = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) }; assert_eq!(r, 0, "pipe2 failed"); Pipe { read: fds[0], write: fds[1], } } } impl Drop for Pipe { fn drop(&mut self) { unsafe { libc::close(self.read); libc::close(self.write); } } } fn raw_write(fd: RawFd, buf: &[u8]) -> isize { unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) } } fn raw_read(fd: RawFd, buf: &mut [u8]) -> isize { unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) } } // --------------------------------------------------------------------------- // wait_readable parks until data arrives, then libc::read succeeds. // --------------------------------------------------------------------------- #[test] fn wait_readable_blocks_until_data_arrives_then_read_succeeds() { let captured: Arc>> = Arc::new(StdMutex::new(Vec::new())); let cap = captured.clone(); let p = Arc::new(Pipe::new()); let p_reader = p.clone(); let p_writer = p.clone(); run(move || { let reader = spawn(move || { // Initially the pipe is empty; this parks. wait_readable(p_reader.read).expect("wait_readable failed"); // Now data should be readable. let mut buf = [0u8; 16]; let n = raw_read(p_reader.read, &mut buf); assert!(n > 0, "read returned {}", n); cap.lock().unwrap().extend_from_slice(&buf[..n as usize]); }); let writer = spawn(move || { // Yield so the reader gets to park first. yield_now(); yield_now(); // Sleep a touch so the reader is definitely waiting in epoll. smarm::sleep(Duration::from_millis(5)); let n = raw_write(p_writer.write, b"hello"); assert_eq!(n, 5); }); reader.join().unwrap(); writer.join().unwrap(); }); assert_eq!(*captured.lock().unwrap(), b"hello"); } // --------------------------------------------------------------------------- // The smarm::scheduler::read sugar — wait_readable + libc::read in one call. // --------------------------------------------------------------------------- #[test] fn read_sugar_returns_bytes_from_pipe() { let captured: Arc>> = Arc::new(StdMutex::new(Vec::new())); let cap = captured.clone(); let p = Arc::new(Pipe::new()); let p_reader = p.clone(); let p_writer = p.clone(); run(move || { let reader = spawn(move || { let mut buf = [0u8; 16]; let n = smarm::scheduler::read(p_reader.read, &mut buf) .expect("smarm::scheduler::read failed"); cap.lock().unwrap().extend_from_slice(&buf[..n]); }); let writer = spawn(move || { yield_now(); smarm::sleep(Duration::from_millis(5)); let _ = raw_write(p_writer.write, b"world"); }); reader.join().unwrap(); writer.join().unwrap(); }); assert_eq!(*captured.lock().unwrap(), b"world"); } // --------------------------------------------------------------------------- // wait_writable + write — though pipes are almost always writable; the // useful test here is that the call doesn't hang on a writable fd. // --------------------------------------------------------------------------- #[test] fn write_sugar_sends_bytes_to_pipe() { let counter = Arc::new(AtomicU32::new(0)); let c = counter.clone(); let p = Arc::new(Pipe::new()); let p_writer = p.clone(); let p_reader = p.clone(); run(move || { let writer = spawn(move || { // Pipe is empty + has buffer space, so this returns immediately // after wait_writable wakes (which happens fast because the // kernel marks an empty pipe as immediately writable). let n = smarm::scheduler::write(p_writer.write, b"smarm") .expect("write failed"); assert_eq!(n, 5); c.fetch_add(1, Ordering::SeqCst); }); let reader = spawn(move || { // Give the writer time. smarm::sleep(Duration::from_millis(10)); let mut buf = [0u8; 16]; let n = raw_read(p_reader.read, &mut buf); assert_eq!(n, 5); assert_eq!(&buf[..5], b"smarm"); }); writer.join().unwrap(); reader.join().unwrap(); }); assert_eq!(counter.load(Ordering::SeqCst), 1); } // --------------------------------------------------------------------------- // While an actor is parked on wait_readable, other actors keep running. // --------------------------------------------------------------------------- #[test] fn other_actors_run_while_one_is_parked_on_wait_readable() { let log: Arc>> = Arc::new(StdMutex::new(Vec::new())); let la = log.clone(); let lb = log.clone(); let p = Arc::new(Pipe::new()); let p_a = p.clone(); let p_b = p.clone(); run(move || { let a = spawn(move || { la.lock().unwrap().push(b'A'); wait_readable(p_a.read).unwrap(); la.lock().unwrap().push(b'a'); }); let b = spawn(move || { // A starts parking on the empty pipe; B should be free to do // its work in the meantime. for _ in 0..3 { yield_now(); lb.lock().unwrap().push(b'B'); } // Now wake A. let _ = raw_write(p_b.write, b"x"); }); a.join().unwrap(); b.join().unwrap(); }); let v = log.lock().unwrap(); // A goes first ('A'), then B makes progress (multiple 'B's) while A is // parked, then A wakes and finishes ('a'). let pos_big_a = v.iter().position(|&c| c == b'A').unwrap(); let pos_lit_a = v.iter().position(|&c| c == b'a').unwrap(); let big_b_count = v.iter().filter(|&&c| c == b'B').count(); assert_eq!(big_b_count, 3, "B should have made 3 steps: {:?}", *v); assert!(pos_big_a < pos_lit_a, "A pre-park before A post-park: {:?}", *v); // At least the last B step should be before A resumes. let last_big_b = v.iter().rposition(|&c| c == b'B').unwrap(); assert!(last_big_b < pos_lit_a, "B should finish before A resumes: {:?}", *v); } // --------------------------------------------------------------------------- // Two-way pipe ping-pong via wait_readable. // --------------------------------------------------------------------------- #[test] fn ping_pong_between_two_pipes_completes() { // a_to_b: actor A writes, actor B reads. // b_to_a: actor B writes, actor A reads. let a_to_b = Arc::new(Pipe::new()); let b_to_a = Arc::new(Pipe::new()); let counter = Arc::new(AtomicU32::new(0)); let ca = counter.clone(); let cb = counter.clone(); let a_to_b_a = a_to_b.clone(); let a_to_b_b = a_to_b.clone(); let b_to_a_a = b_to_a.clone(); let b_to_a_b = b_to_a.clone(); run(move || { let a = spawn(move || { for _ in 0..5 { let _ = raw_write(a_to_b_a.write, b"x"); wait_readable(b_to_a_a.read).unwrap(); let mut buf = [0u8; 4]; let _ = raw_read(b_to_a_a.read, &mut buf); ca.fetch_add(1, Ordering::SeqCst); } }); let b = spawn(move || { for _ in 0..5 { wait_readable(a_to_b_b.read).unwrap(); let mut buf = [0u8; 4]; let _ = raw_read(a_to_b_b.read, &mut buf); let _ = raw_write(b_to_a_b.write, b"y"); cb.fetch_add(1, Ordering::SeqCst); } }); a.join().unwrap(); b.join().unwrap(); }); // Both sides did 5 rounds; counter is incremented by both, so total = 10. assert_eq!(counter.load(Ordering::SeqCst), 10); } // --------------------------------------------------------------------------- // Same fd reused across calls — DEL+ADD cycle works. // --------------------------------------------------------------------------- #[test] fn same_fd_can_be_waited_on_repeatedly() { let p = Arc::new(Pipe::new()); let p_r = p.clone(); let p_w = p.clone(); let counter = Arc::new(AtomicU32::new(0)); let c = counter.clone(); run(move || { let reader = spawn(move || { for _ in 0..4 { wait_readable(p_r.read).unwrap(); let mut buf = [0u8; 4]; let n = raw_read(p_r.read, &mut buf); assert!(n > 0); c.fetch_add(1, Ordering::SeqCst); } }); let writer = spawn(move || { for _ in 0..4 { yield_now(); smarm::sleep(Duration::from_millis(2)); let _ = raw_write(p_w.write, b"z"); } }); reader.join().unwrap(); writer.join().unwrap(); }); assert_eq!(counter.load(Ordering::SeqCst), 4); } // --------------------------------------------------------------------------- // Sanity that wait_writable on an already-writable pipe returns promptly. // --------------------------------------------------------------------------- #[test] fn wait_writable_on_empty_pipe_returns_quickly() { let p = Arc::new(Pipe::new()); let p_w = p.clone(); let start = std::time::Instant::now(); run(move || { wait_writable(p_w.write).unwrap(); }); let elapsed = start.elapsed(); assert!( elapsed < Duration::from_millis(200), "wait_writable should be fast on a writable fd, took {:?}", elapsed ); }