//! Benchmarks where tokio's design has a structural advantage. //! //! These exist to *measure* the cost of smarm's design choices, not to flatter //! either runtime. Expect tokio to win these; the value is in knowing by how //! much, and in catching regressions where the gap widens. //! //! Workloads: //! 5. spawn_storm_busy — keep N workers busy with yielding tasks, then //! spawn 10k zero-work tasks and join. Adapted from //! tokio's `spawn_many_remote_busy1`. Tokio's //! work-stealing deques + per-worker LIFO slot //! should beat smarm's single global Mutex<> //! run queue. //! 6. mpsc_contention — 32 producer actors, 1 consumer, 10k messages //! each. Tokio's mpsc is lock-free on the hot path; //! smarm's channel is Arc> per channel //! *and* takes the runtime mutex on each unpark. //! 7. many_timers — 10k actors each sleep for a random short //! duration (1–10 ms), all wake within a tight //! window. Tokio's per-worker sharded timer wheel //! vs smarm's single shared min-heap (and single //! drain-lock winner). //! 8. multi_thread_scaling— primes again, but sweep thread count 1, 2, 4, //! available_parallelism(). Smarm's mutex ceiling //! should show up as soon as scheduling overhead //! is non-trivial relative to per-actor work. use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; // --------------------------------------------------------------------------- // Shared harness // --------------------------------------------------------------------------- const ITERS: u32 = 15; fn available_threads() -> usize { std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1) } fn print_header(title: &str) { println!("\n{}", "=".repeat(80)); println!(" {title}"); println!("{}", "=".repeat(80)); println!( "{:>26} | {:>12} | {:>10} | {:>10} | {:>10}", "runtime", "result", "median µs", "min µs", "max µs" ); println!("{}", "-".repeat(80)); } fn run_n (u64, u128)>(name: &str, n: u32, mut f: F) { let mut times = Vec::new(); let mut last = 0u64; let _ = f(); // warmup for _ in 0..n { let (v, t) = f(); times.push(t); last = v; } times.sort_unstable(); let median = times[times.len() / 2]; let min = *times.iter().min().unwrap(); let max = *times.iter().max().unwrap(); println!( "{:>26} | {:>12} | {:>10} | {:>10} | {:>10}", name, last, median, min, max ); } // --------------------------------------------------------------------------- // 5. spawn_storm_busy — workers loaded, then storm of zero-work spawns // --------------------------------------------------------------------------- const STORM_BACKGROUND: u64 = 8; // number of background "busy" actors const STORM_SPAWN: u64 = 10_000; // zero-work spawns to time fn bench_storm_smarm(threads: usize) -> (u64, u128) { let counter = Arc::new(AtomicU64::new(0)); let stop = Arc::new(AtomicBool::new(false)); let c2 = counter.clone(); let s2 = stop.clone(); let start = Instant::now(); smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(move || { // Background actors: yield in a tight loop until told to stop. let mut bg_handles = Vec::new(); for _ in 0..STORM_BACKGROUND { let s = s2.clone(); bg_handles.push(smarm::spawn(move || { while !s.load(Ordering::Relaxed) { smarm::yield_now(); } })); } // Storm: spawn 10k zero-work actors and join them all. let mut handles = Vec::new(); for _ in 0..STORM_SPAWN { let cc = c2.clone(); handles.push(smarm::spawn(move || { cc.fetch_add(1, Ordering::Relaxed); })); } for h in handles { h.join().unwrap(); } // Tear down background. s2.store(true, Ordering::Relaxed); for h in bg_handles { h.join().unwrap(); } }); (counter.load(Ordering::Relaxed), start.elapsed().as_micros()) } fn bench_storm_tokio_current() -> (u64, u128) { let counter = Arc::new(AtomicU64::new(0)); let stop = Arc::new(AtomicBool::new(false)); let c2 = counter.clone(); let s2 = stop.clone(); let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); let start = Instant::now(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { let mut bg_handles = Vec::new(); for _ in 0..STORM_BACKGROUND { let s = s2.clone(); bg_handles.push(tokio::task::spawn_local(async move { while !s.load(Ordering::Relaxed) { tokio::task::yield_now().await; } })); } let mut handles = Vec::new(); for _ in 0..STORM_SPAWN { let cc = c2.clone(); handles.push(tokio::task::spawn_local(async move { cc.fetch_add(1, Ordering::Relaxed); })); } for h in handles { let _ = h.await; } s2.store(true, Ordering::Relaxed); for h in bg_handles { let _ = h.await; } }); (counter.load(Ordering::Relaxed), start.elapsed().as_micros()) } fn bench_storm_tokio_multi() -> (u64, u128) { let counter = Arc::new(AtomicU64::new(0)); let stop = Arc::new(AtomicBool::new(false)); let c2 = counter.clone(); let s2 = stop.clone(); let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(available_threads()) .build() .unwrap(); let start = Instant::now(); rt.block_on(async move { let mut bg_handles = Vec::new(); for _ in 0..STORM_BACKGROUND { let s = s2.clone(); bg_handles.push(tokio::spawn(async move { while !s.load(Ordering::Relaxed) { tokio::task::yield_now().await; } })); } let mut handles = Vec::new(); for _ in 0..STORM_SPAWN { let cc = c2.clone(); handles.push(tokio::spawn(async move { cc.fetch_add(1, Ordering::Relaxed); })); } for h in handles { let _ = h.await; } s2.store(true, Ordering::Relaxed); for h in bg_handles { let _ = h.await; } }); (counter.load(Ordering::Relaxed), start.elapsed().as_micros()) } // --------------------------------------------------------------------------- // 6. mpsc_contention — 32 producers × 10k msgs into 1 consumer // --------------------------------------------------------------------------- const MPSC_PRODUCERS: u64 = 32; const MPSC_PER_PRODUCER: u64 = 10_000; fn bench_mpsc_smarm(threads: usize) -> (u64, u128) { let start = Instant::now(); smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(|| { let (tx, rx) = smarm::channel::(); let mut prod_handles = Vec::new(); for p in 0..MPSC_PRODUCERS { let tx = tx.clone(); prod_handles.push(smarm::spawn(move || { for i in 0..MPSC_PER_PRODUCER { tx.send(p * MPSC_PER_PRODUCER + i).unwrap(); } })); } drop(tx); // close once producers drop let consumer = smarm::spawn(move || { let mut count = 0u64; while let Ok(_) = rx.recv() { count += 1; } let _ = count; // discard; run() closure must return () }); for h in prod_handles { h.join().unwrap(); } let _ = consumer.join().unwrap(); }); (MPSC_PRODUCERS * MPSC_PER_PRODUCER, start.elapsed().as_micros()) } fn bench_mpsc_tokio_current() -> (u64, u128) { let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); let start = Instant::now(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); let mut prod_handles = Vec::new(); for p in 0..MPSC_PRODUCERS { let tx = tx.clone(); prod_handles.push(tokio::task::spawn_local(async move { for i in 0..MPSC_PER_PRODUCER { tx.send(p * MPSC_PER_PRODUCER + i).unwrap(); } })); } drop(tx); let consumer = tokio::task::spawn_local(async move { let mut count = 0u64; while let Some(_) = rx.recv().await { count += 1; } count }); for h in prod_handles { let _ = h.await; } let _ = consumer.await; }); (MPSC_PRODUCERS * MPSC_PER_PRODUCER, start.elapsed().as_micros()) } fn bench_mpsc_tokio_multi() -> (u64, u128) { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(available_threads()) .build() .unwrap(); let start = Instant::now(); rt.block_on(async move { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); let mut prod_handles = Vec::new(); for p in 0..MPSC_PRODUCERS { let tx = tx.clone(); prod_handles.push(tokio::spawn(async move { for i in 0..MPSC_PER_PRODUCER { tx.send(p * MPSC_PER_PRODUCER + i).unwrap(); } })); } drop(tx); let consumer = tokio::spawn(async move { let mut count = 0u64; while let Some(_) = rx.recv().await { count += 1; } count }); for h in prod_handles { let _ = h.await; } let _ = consumer.await; }); (MPSC_PRODUCERS * MPSC_PER_PRODUCER, start.elapsed().as_micros()) } // --------------------------------------------------------------------------- // 7. many_timers — 10k sleeping actors waking in a tight window // --------------------------------------------------------------------------- const TIMER_ACTORS: u64 = 10_000; const TIMER_MIN_MS: u64 = 1; const TIMER_MAX_MS: u64 = 10; // Deterministic per-actor delay so iterations are comparable. fn timer_delay_ms(i: u64) -> u64 { TIMER_MIN_MS + (i * 2654435761u64 >> 32) % (TIMER_MAX_MS - TIMER_MIN_MS + 1) } fn bench_timers_smarm(threads: usize) -> (u64, u128) { let start = Instant::now(); smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(|| { let mut handles = Vec::new(); for i in 0..TIMER_ACTORS { let ms = timer_delay_ms(i); handles.push(smarm::spawn(move || { smarm::sleep(Duration::from_millis(ms)); })); } for h in handles { h.join().unwrap(); } }); (TIMER_ACTORS, start.elapsed().as_micros()) } fn bench_timers_tokio_current() -> (u64, u128) { let rt = tokio::runtime::Builder::new_current_thread() .enable_time() .build() .unwrap(); let start = Instant::now(); let local = tokio::task::LocalSet::new(); local.block_on(&rt, async move { let mut handles = Vec::new(); for i in 0..TIMER_ACTORS { let ms = timer_delay_ms(i); handles.push(tokio::task::spawn_local(async move { tokio::time::sleep(Duration::from_millis(ms)).await; })); } for h in handles { let _ = h.await; } }); (TIMER_ACTORS, start.elapsed().as_micros()) } fn bench_timers_tokio_multi() -> (u64, u128) { let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(available_threads()) .enable_time() .build() .unwrap(); let start = Instant::now(); rt.block_on(async move { let mut handles = Vec::new(); for i in 0..TIMER_ACTORS { let ms = timer_delay_ms(i); handles.push(tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(ms)).await; })); } for h in handles { let _ = h.await; } }); (TIMER_ACTORS, start.elapsed().as_micros()) } // --------------------------------------------------------------------------- // 8. multi_thread_scaling — primes, sweep thread count // --------------------------------------------------------------------------- const SCALING_N: u64 = 400_000; const SCALING_WORKERS: u64 = 64; fn is_prime(n: u64) -> bool { if n < 2 { return false; } if n < 4 { return true; } if n % 2 == 0 { return false; } let mut i = 3u64; while i * i <= n { if n % i == 0 { return false; } i += 2; } true } fn count_primes(lo: u64, hi: u64) -> u64 { (lo..hi).filter(|&n| is_prime(n)).count() as u64 } fn scaling_slice(w: u64) -> (u64, u64) { let per = SCALING_N / SCALING_WORKERS; let lo = w * per; let hi = if w + 1 == SCALING_WORKERS { SCALING_N } else { lo + per }; (lo, hi) } fn bench_scaling_smarm(threads: usize) -> (u64, u128) { let total = Arc::new(AtomicU64::new(0)); let t2 = total.clone(); let start = Instant::now(); smarm::runtime::init(smarm::runtime::Config::exact(threads)).run(move || { let mut handles = Vec::new(); for w in 0..SCALING_WORKERS { let (lo, hi) = scaling_slice(w); let tc = t2.clone(); handles.push(smarm::spawn(move || { tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed); })); } for h in handles { h.join().unwrap(); } }); (total.load(Ordering::Relaxed), start.elapsed().as_micros()) } fn bench_scaling_tokio_multi(threads: usize) -> (u64, u128) { let total = Arc::new(AtomicU64::new(0)); let t2 = total.clone(); let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(threads) .build() .unwrap(); let start = Instant::now(); rt.block_on(async move { let mut handles = Vec::new(); for w in 0..SCALING_WORKERS { let (lo, hi) = scaling_slice(w); let tc = t2.clone(); handles.push(tokio::spawn(async move { tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed); })); } for h in handles { let _ = h.await; } }); (total.load(Ordering::Relaxed), start.elapsed().as_micros()) } // --------------------------------------------------------------------------- // main // --------------------------------------------------------------------------- fn main() { let n = available_threads(); println!("smarm tokio-favored benchmarks"); println!("available parallelism: {n} threads"); println!("ITERS={ITERS} (+1 warmup, discarded)"); println!( "STORM_BACKGROUND={STORM_BACKGROUND}, STORM_SPAWN={STORM_SPAWN}, \ MPSC={MPSC_PRODUCERS}×{MPSC_PER_PRODUCER}, \ TIMER_ACTORS={TIMER_ACTORS} ({TIMER_MIN_MS}–{TIMER_MAX_MS} ms), \ SCALING_N={SCALING_N}/{SCALING_WORKERS}" ); // ---- 5. spawn_storm_busy ---- print_header(&format!( "spawn_storm_busy: {STORM_BACKGROUND} bg yielders + {STORM_SPAWN} zero-work spawns" )); run_n("smarm 1-thread", ITERS, || bench_storm_smarm(1)); run_n(&format!("smarm {n}-thread"), ITERS, || bench_storm_smarm(n)); run_n("tokio current_thread", ITERS, bench_storm_tokio_current); run_n("tokio multi-thread", ITERS, bench_storm_tokio_multi); // ---- 6. mpsc_contention ---- print_header(&format!( "mpsc_contention: {MPSC_PRODUCERS} producers × {MPSC_PER_PRODUCER} msgs → 1 consumer" )); run_n("smarm 1-thread", ITERS, || bench_mpsc_smarm(1)); run_n(&format!("smarm {n}-thread"), ITERS, || bench_mpsc_smarm(n)); run_n("tokio current_thread", ITERS, bench_mpsc_tokio_current); run_n("tokio multi-thread", ITERS, bench_mpsc_tokio_multi); // ---- 7. many_timers ---- print_header(&format!( "many_timers: {TIMER_ACTORS} actors sleeping {TIMER_MIN_MS}–{TIMER_MAX_MS} ms" )); run_n("smarm 1-thread", ITERS, || bench_timers_smarm(1)); run_n(&format!("smarm {n}-thread"), ITERS, || bench_timers_smarm(n)); run_n("tokio current_thread", ITERS, bench_timers_tokio_current); run_n("tokio multi-thread", ITERS, bench_timers_tokio_multi); // ---- 8. multi_thread_scaling ---- print_header(&format!( "multi_thread_scaling: primes in [2, {SCALING_N}) across {SCALING_WORKERS} workers" )); let sweep: Vec = { let mut v = vec![1usize, 2, 4]; if n > 4 && !v.contains(&n) { v.push(n); } v.into_iter().filter(|t| *t <= n).collect() }; for t in &sweep { run_n(&format!("smarm {t}-thread"), ITERS, || bench_scaling_smarm(*t)); } for t in &sweep { run_n(&format!("tokio multi {t}-thread"), ITERS, || bench_scaling_tokio_multi(*t)); } }