//! Benchmarks for the multi-scheduler runtime. //! //! Three workloads, three runtimes: //! - smarm single-thread (exact = 1) //! - smarm multi-thread (exact = available_parallelism) //! - tokio current_thread (single-thread baseline) //! - tokio multi-thread (the parallel comparison) //! //! Workloads: //! 1. Fan-out / fan-in compute (primes) — CPU-bound, tests parallelism //! 2. Ping-pong — message-passing overhead, park/unpark cost //! 3. Spawn throughput — cost of spawn + join per actor use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; // --------------------------------------------------------------------------- // Shared helpers // --------------------------------------------------------------------------- fn available_threads() -> usize { std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1) } fn print_header(title: &str) { println!("\n{}", "=".repeat(80)); println!(" {title}"); println!("{}", "=".repeat(80)); println!( "{:>22} | {:>12} | {:>10} | {:>10} | {:>10}", "runtime", "result", "median µs", "min µs", "max µs" ); println!("{}", "-".repeat(80)); } fn run_n (u64, u128)>(name: &str, n: u32, mut f: F) { let mut times = Vec::new(); let mut last = 0u64; for _ in 0..n { let (v, t) = f(); times.push(t); last = v; } times.sort_unstable(); let median = times[times.len() / 2]; let min = *times.iter().min().unwrap(); let max = *times.iter().max().unwrap(); println!( "{:>22} | {:>12} | {:>10} | {:>10} | {:>10}", name, last, median, min, max ); } const ITERS: u32 = 7; // --------------------------------------------------------------------------- // Workload 1: fan-out / fan-in primes // --------------------------------------------------------------------------- const PRIME_N: u64 = 400_000; const WORKERS: u64 = 64; fn is_prime(n: u64) -> bool { if n < 2 { return false; } if n < 4 { return true; } if n % 2 == 0 { return false; } let mut i = 3u64; while i * i <= n { if n % i == 0 { return false; } i += 2; } true } fn count_primes(lo: u64, hi: u64) -> u64 { (lo..hi).filter(|&n| is_prime(n)).count() as u64 } fn primes_slice(w: u64) -> (u64, u64) { let per = PRIME_N / WORKERS; let lo = w * per; let hi = if w + 1 == WORKERS { PRIME_N } else { lo + per }; (lo, hi) } fn bench_primes_smarm(threads: usize) -> (u64, u128) { let total = Arc::new(AtomicU64::new(0)); let t2 = total.clone(); let start = Instant::now(); smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(move || { let mut handles = Vec::new(); for w in 0..WORKERS { let (lo, hi) = primes_slice(w); let tc = t2.clone(); handles.push(smarm::spawn(move || { tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed); })); } for h in handles { h.join().unwrap(); } }); (total.load(Ordering::Relaxed), start.elapsed().as_micros()) } fn bench_primes_tokio_current() -> (u64, u128) { let total = Arc::new(AtomicU64::new(0)); let t2 = total.clone(); let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); let start = Instant::now(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { let mut handles = Vec::new(); for w in 0..WORKERS { let (lo, hi) = primes_slice(w); let tc = t2.clone(); handles.push(tokio::task::spawn_local(async move { tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed); })); } for h in handles { let _ = h.await; } }); (total.load(Ordering::Relaxed), start.elapsed().as_micros()) } fn bench_primes_tokio_multi() -> (u64, u128) { let total = Arc::new(AtomicU64::new(0)); let t2 = total.clone(); let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(available_threads()) .build() .unwrap(); let start = Instant::now(); rt.block_on(async move { let mut handles = Vec::new(); for w in 0..WORKERS { let (lo, hi) = primes_slice(w); let tc = t2.clone(); handles.push(tokio::spawn(async move { tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed); })); } for h in handles { let _ = h.await; } }); (total.load(Ordering::Relaxed), start.elapsed().as_micros()) } fn bench_primes_baseline() -> (u64, u128) { let start = Instant::now(); let total: u64 = (0..WORKERS).map(|w| { let (lo, hi) = primes_slice(w); count_primes(lo, hi) }).sum(); (total, start.elapsed().as_micros()) } // --------------------------------------------------------------------------- // Workload 2: channel ping-pong // --------------------------------------------------------------------------- const PING_ROUNDS: u64 = 10_000; fn bench_pingpong_smarm(threads: usize) -> (u64, u128) { let start = Instant::now(); smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(|| { let (tx_a, rx_a) = smarm::channel::(); let (tx_b, rx_b) = smarm::channel::(); let ha = smarm::spawn(move || { tx_a.send(0).unwrap(); loop { let v = rx_b.recv().unwrap(); if v >= PING_ROUNDS { break; } tx_a.send(v + 1).unwrap(); } }); let hb = smarm::spawn(move || { loop { let v = rx_a.recv().unwrap(); tx_b.send(v + 1).unwrap(); if v + 1 >= PING_ROUNDS { break; } } }); ha.join().unwrap(); hb.join().unwrap(); }); (PING_ROUNDS, start.elapsed().as_micros()) } fn bench_pingpong_tokio_current() -> (u64, u128) { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); let start = Instant::now(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { let (tx_a, mut rx_a) = tokio::sync::mpsc::unbounded_channel::(); let (tx_b, mut rx_b) = tokio::sync::mpsc::unbounded_channel::(); let ha = tokio::task::spawn_local(async move { tx_a.send(0).unwrap(); loop { let v = rx_b.recv().await.unwrap(); if v >= PING_ROUNDS { break; } tx_a.send(v + 1).unwrap(); } }); let hb = tokio::task::spawn_local(async move { loop { let v = rx_a.recv().await.unwrap(); tx_b.send(v + 1).unwrap(); if v + 1 >= PING_ROUNDS { break; } } }); let _ = ha.await; let _ = hb.await; }); (PING_ROUNDS, start.elapsed().as_micros()) } fn bench_pingpong_tokio_multi() -> (u64, u128) { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) // ping-pong only needs 2 threads .enable_all() .build() .unwrap(); let start = Instant::now(); rt.block_on(async move { let (tx_a, mut rx_a) = tokio::sync::mpsc::unbounded_channel::(); let (tx_b, mut rx_b) = tokio::sync::mpsc::unbounded_channel::(); let ha = tokio::spawn(async move { tx_a.send(0).unwrap(); loop { let v = rx_b.recv().await.unwrap(); if v >= PING_ROUNDS { break; } tx_a.send(v + 1).unwrap(); } }); let hb = tokio::spawn(async move { loop { let v = rx_a.recv().await.unwrap(); tx_b.send(v + 1).unwrap(); if v + 1 >= PING_ROUNDS { break; } } }); let _ = ha.await; let _ = hb.await; }); (PING_ROUNDS, start.elapsed().as_micros()) } // --------------------------------------------------------------------------- // Workload 3: spawn throughput // --------------------------------------------------------------------------- const SPAWN_COUNT: u64 = 1_000; fn bench_spawn_smarm(threads: usize) -> (u64, u128) { let counter = Arc::new(AtomicU64::new(0)); let c = counter.clone(); let start = Instant::now(); smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(move || { let mut handles = Vec::new(); for _ in 0..SPAWN_COUNT { let cc = c.clone(); handles.push(smarm::spawn(move || { cc.fetch_add(1, Ordering::Relaxed); })); } for h in handles { h.join().unwrap(); } }); (counter.load(Ordering::Relaxed), start.elapsed().as_micros()) } fn bench_spawn_tokio_current() -> (u64, u128) { let counter = Arc::new(AtomicU64::new(0)); let c = counter.clone(); let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); let start = Instant::now(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { let mut handles = Vec::new(); for _ in 0..SPAWN_COUNT { let cc = c.clone(); handles.push(tokio::task::spawn_local(async move { cc.fetch_add(1, Ordering::Relaxed); })); } for h in handles { let _ = h.await; } }); (counter.load(Ordering::Relaxed), start.elapsed().as_micros()) } fn bench_spawn_tokio_multi() -> (u64, u128) { let counter = Arc::new(AtomicU64::new(0)); let c = counter.clone(); let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(available_threads()) .build() .unwrap(); let start = Instant::now(); rt.block_on(async move { let mut handles = Vec::new(); for _ in 0..SPAWN_COUNT { let cc = c.clone(); handles.push(tokio::spawn(async move { cc.fetch_add(1, Ordering::Relaxed); })); } for h in handles { let _ = h.await; } }); (counter.load(Ordering::Relaxed), start.elapsed().as_micros()) } // --------------------------------------------------------------------------- // main // --------------------------------------------------------------------------- fn main() { let n = available_threads(); println!("smarm multi-scheduler benchmarks"); println!("available parallelism: {n} threads"); println!("PRIME_N={PRIME_N}, WORKERS={WORKERS}, PING_ROUNDS={PING_ROUNDS}, SPAWN_COUNT={SPAWN_COUNT}"); // ---- Primes ---- print_header(&format!("Fan-out/fan-in: count primes in [2, {PRIME_N}) across {WORKERS} workers")); run_n("baseline (serial)", ITERS, bench_primes_baseline); run_n("smarm single-thread", ITERS, || bench_primes_smarm(1)); run_n(&format!("smarm {n}-thread"), ITERS, || bench_primes_smarm(n)); run_n("tokio current_thread", ITERS, bench_primes_tokio_current); run_n("tokio multi-thread", ITERS, bench_primes_tokio_multi); // ---- Ping-pong ---- print_header(&format!("Ping-pong: {PING_ROUNDS} round-trips between two actors")); run_n("smarm single-thread", ITERS, || bench_pingpong_smarm(1)); run_n(&format!("smarm {n}-thread"), ITERS, || bench_pingpong_smarm(n)); run_n("tokio current_thread", ITERS, bench_pingpong_tokio_current); run_n("tokio multi-thread", ITERS, bench_pingpong_tokio_multi); // ---- Spawn throughput ---- print_header(&format!("Spawn throughput: {SPAWN_COUNT} actors spawned and joined")); run_n("smarm single-thread", ITERS, || bench_spawn_smarm(1)); run_n(&format!("smarm {n}-thread"), ITERS, || bench_spawn_smarm(n)); run_n("tokio current_thread", ITERS, bench_spawn_tokio_current); run_n("tokio multi-thread", ITERS, bench_spawn_tokio_multi); }