Files
smarm/benches/general.rs
Bench 3da6ffaa77 benches: expose preemption knobs + sweep runner
Config API changes (src/preempt.rs, src/runtime.rs):
- preempt: promote ALLOC_INTERVAL and TIMESLICE_CYCLES from bare consts to
  DEFAULT_ALLOC_INTERVAL / DEFAULT_TIMESLICE_CYCLES; store active values in
  thread-locals set on each actor resume so multiple runtimes can use
  different settings concurrently.
- runtime: add alloc_interval / timeslice_cycles fields to Config; add
  Config::alloc_interval(n) and Config::timeslice_cycles(c) builder methods;
  thread the values through RuntimeInner to the reset_timeslice() call in
  schedule_loop.

Bench changes:
- Add bench_cfg(threads) helper to general/tokio_favored/smarm_favored that
  wraps Config::exact and reads SMARM_ALLOC_INTERVAL / SMARM_TIMESLICE_CYCLES
  env vars, so the sweep script can vary knobs without recompiling.

Sweep tooling (benches/sweep.py):
- 'run':     run the 3-file bench suite once; --save-baseline persists JSON
- 'regress': compare current run against baseline.json, exit 1 on any bench
             that regresses >10% vs stored medians
- 'sweep':   run the full SWEEP_GRID (10 points), print comparison table,
             optional --save-csv; binaries pre-built so no recompile per point

Sweep results (10-point grid, 1-CPU sandbox):
- The preemption knobs have very little effect on this single-CPU machine.
  Most benches move <5% across the entire grid.
- Longer timeslices (tc=600k, tc=1200k) reliably hurt spawn_storm_busy
  (+11-15%) and catch_unwind_panics (+10-12%) because actors hold the
  scheduler mutex longer per timeslice, stalling the storm of joinable tasks.
- Shorter timeslices (tc=150k) give a small improvement on many_timers
  (-3-4%) and a wash everywhere else.
- yield_in_hot_loop and uncontended_channel are essentially flat across all
  knobs — both are scheduling-dominated and call yield_now explicitly, so
  the RDTSC-driven preemption path is irrelevant.
- Conclusion: the knobs matter primarily under contention (multi-core).
  Re-run sweep on a multi-core machine before drawing tuning conclusions.
2026-05-25 13:04:58 +00:00

443 lines
16 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! General benchmarks — workloads where neither runtime has a structural
//! advantage. Both should be competitive; large gaps here indicate a real
//! difference in per-task or per-yield overhead.
//!
//! Workloads:
//! 1. chained_spawn — task N spawns N+1, depth 1000. Spawn+exit overhead in
//! a serial chain. Adapted from tokio's bench of the same
//! name.
//! 2. yield_many — 200 actors × 1000 yields. Pure scheduling throughput
//! with no allocation, no IO. Adapted from tokio.
//! 3. fan_out_compute— count primes in [2, 400_000) across 64 workers. Same
//! shape as multi_scheduler::primes but lives here for
//! completeness.
//! 4. ping_pong_oneshot — N rounds of (spawn pair, send oneshot, await).
//! Closer to a request/response workload than channel
//! ping-pong.
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
// ---------------------------------------------------------------------------
// Shared harness
// ---------------------------------------------------------------------------
const ITERS: u32 = 15;
fn available_threads() -> usize {
std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1)
}
fn print_header(title: &str) {
println!("\n{}", "=".repeat(80));
println!(" {title}");
println!("{}", "=".repeat(80));
println!(
"{:>26} | {:>12} | {:>10} | {:>10} | {:>10}",
"runtime", "result", "median µs", "min µs", "max µs"
);
println!("{}", "-".repeat(80));
}
fn run_n<F: FnMut() -> (u64, u128)>(name: &str, n: u32, mut f: F) {
let mut times = Vec::new();
let mut last = 0u64;
// One warmup iteration, discarded.
let _ = f();
for _ in 0..n {
let (v, t) = f();
times.push(t);
last = v;
}
times.sort_unstable();
let median = times[times.len() / 2];
let min = *times.iter().min().unwrap();
let max = *times.iter().max().unwrap();
println!(
"{:>26} | {:>12} | {:>10} | {:>10} | {:>10}",
name, last, median, min, max
);
}
// ---------------------------------------------------------------------------
// 1. chained_spawn — depth 1000
// ---------------------------------------------------------------------------
const CHAIN_DEPTH: u64 = 1_000;
fn bench_chained_smarm(threads: usize) -> (u64, u128) {
let counter = Arc::new(AtomicU64::new(0));
let c2 = counter.clone();
let start = Instant::now();
smarm::runtime::init(bench_cfg(threads)).run(move || {
// Fire-and-forget chain, matching tokio's bench shape: each link
// spawns the next link and exits immediately; depth 0 signals done
// via a channel. Crucially this does *not* nest joins on the
// spawner's stack — important because smarm actor stacks are a
// fixed 64 KiB.
let (tx, rx) = smarm::channel::<()>();
fn iter(c: Arc<AtomicU64>, tx: smarm::Sender<()>, n: u64) {
if n == 0 {
tx.send(()).unwrap();
} else {
let cc = c.clone();
smarm::spawn(move || {
cc.fetch_add(1, Ordering::Relaxed);
iter(cc.clone(), tx, n - 1);
});
// Caller exits; JoinHandle dropped, no parking.
}
}
iter(c2, tx, CHAIN_DEPTH);
rx.recv().unwrap();
});
(counter.load(Ordering::Relaxed), start.elapsed().as_micros())
}
fn bench_chained_tokio_current() -> (u64, u128) {
let counter = Arc::new(AtomicU64::new(0));
let c2 = counter.clone();
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
let start = Instant::now();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
// Use a oneshot done channel like tokio's own chained_spawn bench.
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
fn iter(
c: Arc<AtomicU64>,
done: tokio::sync::oneshot::Sender<()>,
n: u64,
) {
if n == 0 {
let _ = done.send(());
} else {
tokio::task::spawn_local(async move {
c.fetch_add(1, Ordering::Relaxed);
iter(c, done, n - 1);
});
}
}
iter(c2, done_tx, CHAIN_DEPTH);
let _ = done_rx.await;
});
(counter.load(Ordering::Relaxed), start.elapsed().as_micros())
}
fn bench_chained_tokio_multi() -> (u64, u128) {
let counter = Arc::new(AtomicU64::new(0));
let c2 = counter.clone();
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(available_threads())
.build()
.unwrap();
let start = Instant::now();
rt.block_on(async move {
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
fn iter(c: Arc<AtomicU64>, done: tokio::sync::oneshot::Sender<()>, n: u64) {
if n == 0 {
let _ = done.send(());
} else {
tokio::spawn(async move {
c.fetch_add(1, Ordering::Relaxed);
iter(c, done, n - 1);
});
}
}
iter(c2, done_tx, CHAIN_DEPTH);
let _ = done_rx.await;
});
(counter.load(Ordering::Relaxed), start.elapsed().as_micros())
}
// ---------------------------------------------------------------------------
// 2. yield_many — 200 actors × 1000 yields
// ---------------------------------------------------------------------------
const YIELD_TASKS: u64 = 200;
const YIELD_ROUNDS: u64 = 1_000;
fn bench_yield_smarm(threads: usize) -> (u64, u128) {
let start = Instant::now();
smarm::runtime::init(bench_cfg(threads)).run(|| {
let mut handles = Vec::new();
for _ in 0..YIELD_TASKS {
handles.push(smarm::spawn(|| {
for _ in 0..YIELD_ROUNDS {
smarm::yield_now();
}
}));
}
for h in handles {
h.join().unwrap();
}
});
(YIELD_TASKS * YIELD_ROUNDS, start.elapsed().as_micros())
}
fn bench_yield_tokio_current() -> (u64, u128) {
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
let start = Instant::now();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
let mut handles = Vec::new();
for _ in 0..YIELD_TASKS {
handles.push(tokio::task::spawn_local(async move {
for _ in 0..YIELD_ROUNDS {
tokio::task::yield_now().await;
}
}));
}
for h in handles {
let _ = h.await;
}
});
(YIELD_TASKS * YIELD_ROUNDS, start.elapsed().as_micros())
}
fn bench_yield_tokio_multi() -> (u64, u128) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(available_threads())
.build()
.unwrap();
let start = Instant::now();
rt.block_on(async move {
let mut handles = Vec::new();
for _ in 0..YIELD_TASKS {
handles.push(tokio::spawn(async move {
for _ in 0..YIELD_ROUNDS {
tokio::task::yield_now().await;
}
}));
}
for h in handles {
let _ = h.await;
}
});
(YIELD_TASKS * YIELD_ROUNDS, start.elapsed().as_micros())
}
// ---------------------------------------------------------------------------
// 3. fan_out_compute — primes, same shape as multi_scheduler::primes
// ---------------------------------------------------------------------------
const PRIME_N: u64 = 400_000;
const PRIME_WORKERS: u64 = 64;
fn is_prime(n: u64) -> bool {
if n < 2 { return false; }
if n < 4 { return true; }
if n % 2 == 0 { return false; }
let mut i = 3u64;
while i * i <= n { if n % i == 0 { return false; } i += 2; }
true
}
fn count_primes(lo: u64, hi: u64) -> u64 {
(lo..hi).filter(|&n| is_prime(n)).count() as u64
}
fn primes_slice(w: u64) -> (u64, u64) {
let per = PRIME_N / PRIME_WORKERS;
let lo = w * per;
let hi = if w + 1 == PRIME_WORKERS { PRIME_N } else { lo + per };
(lo, hi)
}
fn bench_primes_smarm(threads: usize) -> (u64, u128) {
let total = Arc::new(AtomicU64::new(0));
let t2 = total.clone();
let start = Instant::now();
smarm::runtime::init(bench_cfg(threads)).run(move || {
let mut handles = Vec::new();
for w in 0..PRIME_WORKERS {
let (lo, hi) = primes_slice(w);
let tc = t2.clone();
handles.push(smarm::spawn(move || {
tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed);
}));
}
for h in handles { h.join().unwrap(); }
});
(total.load(Ordering::Relaxed), start.elapsed().as_micros())
}
fn bench_primes_tokio_current() -> (u64, u128) {
let total = Arc::new(AtomicU64::new(0));
let t2 = total.clone();
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
let start = Instant::now();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
let mut handles = Vec::new();
for w in 0..PRIME_WORKERS {
let (lo, hi) = primes_slice(w);
let tc = t2.clone();
handles.push(tokio::task::spawn_local(async move {
tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed);
}));
}
for h in handles { let _ = h.await; }
});
(total.load(Ordering::Relaxed), start.elapsed().as_micros())
}
fn bench_primes_tokio_multi() -> (u64, u128) {
let total = Arc::new(AtomicU64::new(0));
let t2 = total.clone();
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(available_threads())
.build()
.unwrap();
let start = Instant::now();
rt.block_on(async move {
let mut handles = Vec::new();
for w in 0..PRIME_WORKERS {
let (lo, hi) = primes_slice(w);
let tc = t2.clone();
handles.push(tokio::spawn(async move {
tc.fetch_add(count_primes(lo, hi), Ordering::Relaxed);
}));
}
for h in handles { let _ = h.await; }
});
(total.load(Ordering::Relaxed), start.elapsed().as_micros())
}
// ---------------------------------------------------------------------------
// 4. ping_pong_oneshot — 1000 rounds of spawn-pair-await
// ---------------------------------------------------------------------------
const PP_ROUNDS: u64 = 1_000;
fn bench_pp_smarm(threads: usize) -> (u64, u128) {
let start = Instant::now();
smarm::runtime::init(bench_cfg(threads)).run(|| {
for _ in 0..PP_ROUNDS {
// smarm has no oneshot, so use a channel<()> per round — both
// sides spawn, A sends ping, B replies pong, A joins B.
let (tx_ping, rx_ping) = smarm::channel::<()>();
let (tx_pong, rx_pong) = smarm::channel::<()>();
let hb = smarm::spawn(move || {
rx_ping.recv().unwrap();
tx_pong.send(()).unwrap();
});
let ha = smarm::spawn(move || {
tx_ping.send(()).unwrap();
rx_pong.recv().unwrap();
});
ha.join().unwrap();
hb.join().unwrap();
}
});
(PP_ROUNDS, start.elapsed().as_micros())
}
fn bench_pp_tokio_current() -> (u64, u128) {
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
let start = Instant::now();
let local = tokio::task::LocalSet::new();
local.block_on(&rt, async move {
for _ in 0..PP_ROUNDS {
let (tx1, rx1) = tokio::sync::oneshot::channel::<()>();
let (tx2, rx2) = tokio::sync::oneshot::channel::<()>();
let hb = tokio::task::spawn_local(async move {
rx1.await.unwrap();
tx2.send(()).unwrap();
});
let ha = tokio::task::spawn_local(async move {
tx1.send(()).unwrap();
rx2.await.unwrap();
});
let _ = ha.await;
let _ = hb.await;
}
});
(PP_ROUNDS, start.elapsed().as_micros())
}
fn bench_pp_tokio_multi() -> (u64, u128) {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(available_threads())
.build()
.unwrap();
let start = Instant::now();
rt.block_on(async move {
for _ in 0..PP_ROUNDS {
let (tx1, rx1) = tokio::sync::oneshot::channel::<()>();
let (tx2, rx2) = tokio::sync::oneshot::channel::<()>();
let hb = tokio::spawn(async move {
rx1.await.unwrap();
tx2.send(()).unwrap();
});
let ha = tokio::spawn(async move {
tx1.send(()).unwrap();
rx2.await.unwrap();
});
let _ = ha.await;
let _ = hb.await;
}
});
(PP_ROUNDS, start.elapsed().as_micros())
}
// ---------------------------------------------------------------------------
// main
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
// Knob helper — reads SMARM_ALLOC_INTERVAL / SMARM_TIMESLICE_CYCLES env vars
// so the sweep script can override the preemption knobs without recompiling.
// ---------------------------------------------------------------------------
fn bench_cfg(threads: usize) -> smarm::runtime::Config {
let mut cfg = smarm::runtime::Config::exact(threads);
if let Ok(v) = std::env::var("SMARM_ALLOC_INTERVAL") {
if let Ok(n) = v.parse::<u32>() { cfg = cfg.alloc_interval(n); }
}
if let Ok(v) = std::env::var("SMARM_TIMESLICE_CYCLES") {
if let Ok(n) = v.parse::<u64>() { cfg = cfg.timeslice_cycles(n); }
}
cfg
}
fn main() {
let n = available_threads();
println!("smarm general benchmarks");
println!("available parallelism: {n} threads");
println!("ITERS={ITERS} (+1 warmup, discarded)");
println!(
"CHAIN_DEPTH={CHAIN_DEPTH}, YIELD_TASKS={YIELD_TASKS}×{YIELD_ROUNDS}, \
PRIME_N={PRIME_N}/{PRIME_WORKERS} workers, PP_ROUNDS={PP_ROUNDS}"
);
// ---- 1. chained_spawn ----
print_header(&format!("chained_spawn: depth {CHAIN_DEPTH}"));
run_n("smarm 1-thread", ITERS, || bench_chained_smarm(1));
run_n(&format!("smarm {n}-thread"), ITERS, || bench_chained_smarm(n));
run_n("tokio current_thread", ITERS, bench_chained_tokio_current);
run_n("tokio multi-thread", ITERS, bench_chained_tokio_multi);
// ---- 2. yield_many ----
print_header(&format!("yield_many: {YIELD_TASKS} tasks × {YIELD_ROUNDS} yields"));
run_n("smarm 1-thread", ITERS, || bench_yield_smarm(1));
run_n(&format!("smarm {n}-thread"), ITERS, || bench_yield_smarm(n));
run_n("tokio current_thread", ITERS, bench_yield_tokio_current);
run_n("tokio multi-thread", ITERS, bench_yield_tokio_multi);
// ---- 3. fan_out_compute ----
print_header(&format!("fan_out_compute: primes in [2, {PRIME_N}) across {PRIME_WORKERS}"));
run_n("smarm 1-thread", ITERS, || bench_primes_smarm(1));
run_n(&format!("smarm {n}-thread"), ITERS, || bench_primes_smarm(n));
run_n("tokio current_thread", ITERS, bench_primes_tokio_current);
run_n("tokio multi-thread", ITERS, bench_primes_tokio_multi);
// ---- 4. ping_pong_oneshot ----
print_header(&format!("ping_pong_oneshot: {PP_ROUNDS} rounds"));
run_n("smarm 1-thread", ITERS, || bench_pp_smarm(1));
run_n(&format!("smarm {n}-thread"), ITERS, || bench_pp_smarm(n));
run_n("tokio current_thread", ITERS, bench_pp_tokio_current);
run_n("tokio multi-thread", ITERS, bench_pp_tokio_multi);
}