From eb93aad0f988367e00cb0cfe1f8cc7d0c44ebd7a Mon Sep 17 00:00:00 2001 From: dan Date: Thu, 24 Jul 2025 14:00:58 +0300 Subject: [PATCH 1/2] fix(harness): add fixes for browser benches --- crates/harness/runner/src/executor.rs | 126 +++++++++++++++++++------- crates/harness/runner/src/lib.rs | 83 ++++++++++------- crates/harness/runner/src/rpc.rs | 14 +++ crates/harness/static/executor.js | 2 +- 4 files changed, 159 insertions(+), 66 deletions(-) diff --git a/crates/harness/runner/src/executor.rs b/crates/harness/runner/src/executor.rs index b8ed84b8a9..7642e8aff8 100644 --- a/crates/harness/runner/src/executor.rs +++ b/crates/harness/runner/src/executor.rs @@ -8,6 +8,7 @@ use chromiumoxide::{ network::{EnableParams, SetCacheDisabledParams}, page::ReloadParams, }, + handler::HandlerConfig, }; use futures::StreamExt; use harness_core::{ @@ -73,7 +74,7 @@ impl Executor { self.ns.name(), "env", format!("CONFIG={}", serde_json::to_string(&self.config)?), - executor_path + executor_path, ) .stdout_capture() .stderr_capture() @@ -96,7 +97,6 @@ impl Executor { Id::Zero => self.config.network().rpc_0, Id::One => self.config.network().rpc_1, }; - let (wasm_addr, wasm_port) = self.config.network().wasm; // Create a temporary directory for the browser profile. let tmp = duct::cmd!("mktemp", "-d").read()?; @@ -126,8 +126,16 @@ impl Executor { const TIMEOUT: usize = 10000; const DELAY: usize = 100; let mut retries = 0; + let mut config = HandlerConfig::default(); + // Bump the timeout for long-running benches. + config.request_timeout = Duration::from_secs(120); + let (browser, mut handler) = loop { - match Browser::connect(format!("http://{}:{}", rpc_addr.0, PORT_BROWSER)).await + match Browser::connect_with_config( + format!("http://{}:{}", rpc_addr.0, PORT_BROWSER), + config.clone(), + ) + .await { Ok(browser) => break browser, Err(e) => { @@ -143,39 +151,20 @@ impl Executor { tokio::spawn(async move { while let Some(res) = handler.next().await { if let Err(e) = res { - eprintln!("chromium error: {e:?}"); + if e.to_string() + == "data did not match any variant of untagged enum Message" + { + // Do not log this error. It appears to be + // caused by a bug upstream. + // https://github.com/mattsse/chromiumoxide/issues/167 + continue; + } + eprintln!("chromium error: {:?}", e); } } }); - let page = browser - .new_page(&format!("http://{wasm_addr}:{wasm_port}/index.html")) - .await?; - - page.execute(EnableParams::builder().build()).await?; - page.execute(SetCacheDisabledParams { - cache_disabled: true, - }) - .await?; - page.execute(ReloadParams::builder().ignore_cache(true).build()) - .await?; - page.wait_for_navigation().await?; - page.bring_to_front().await?; - page.evaluate(format!( - r#" - (async () => {{ - const config = JSON.parse('{config}'); - console.log("initializing executor", config); - await window.executor.init(config); - console.log("executor initialized"); - return; - }})(); - "#, - config = serde_json::to_string(&self.config)? - )) - .await?; - - let rpc = Rpc::new_browser(page); + let rpc = self.new_browser_rpc(&browser).await?; self.state = State::Started { process, @@ -267,6 +256,79 @@ impl Executor { Ok(()) } } + + /// Reloads the RPC server associated with this browser executor. + pub async fn reload_browser_rpc(&mut self) -> Result<()> { + if !self.is_browser() { + return Err(anyhow!("executor target is not browser")); + } + let State::Started { + process, + rpc, + browser, + } = self.state.take() + else { + return Err(anyhow!("executor is not in the started state")); + }; + + rpc.shutdown().await?; + + let browser = browser.expect("browser is set for browser target"); + + let rpc = self.new_browser_rpc(&browser).await?; + + self.state = State::Started { + process, + rpc, + browser: Some(browser), + }; + + Ok(()) + } + + /// Creates a new RPC server for this browser executor. + async fn new_browser_rpc(&self, browser: &Browser) -> Result { + if !self.is_browser() { + return Err(anyhow!("executor target is not browser")); + } + + let (wasm_addr, wasm_port) = self.config.network().wasm; + + let page = browser + .new_page(&format!("http://{}:{}/index.html", wasm_addr, wasm_port)) + .await?; + + page.execute(EnableParams::builder().build()).await?; + page.execute(SetCacheDisabledParams { + cache_disabled: true, + }) + .await?; + page.execute(ReloadParams::builder().ignore_cache(true).build()) + .await?; + page.wait_for_navigation().await?; + page.bring_to_front().await?; + page.evaluate(format!( + r#" + (async () => {{ + const config = JSON.parse('{config}'); + console.log("initializing executor", config); + await window.executor.init(config); + console.log("executor initialized"); + return; + }})(); + "#, + config = serde_json::to_string(&self.config)? + )) + .await?; + + let rpc = Rpc::new_browser(page); + + Ok(rpc) + } + + pub fn is_browser(&self) -> bool { + self.target == Target::Browser + } } impl Drop for Executor { diff --git a/crates/harness/runner/src/lib.rs b/crates/harness/runner/src/lib.rs index 10358c70d7..c1b801874e 100644 --- a/crates/harness/runner/src/lib.rs +++ b/crates/harness/runner/src/lib.rs @@ -206,46 +206,63 @@ pub async fn main() -> Result<()> { let output_file = std::fs::File::create(output)?; let mut writer = WriterBuilder::new().from_writer(output_file); - let mut benches = Vec::new(); - if !skip_warmup { - benches.extend(vec![WARM_UP_BENCH; 3]); - } - benches.extend(items.to_benches(samples, samples_override)); + let benches = items.to_benches(samples, samples_override); runner.start_services().await?; runner.exec_p.start().await?; runner.exec_v.start().await?; - for config in benches { - runner - .network - .set_proto_config(config.bandwidth, config.protocol_latency.div_ceil(2))?; - runner - .network - .set_app_config(config.bandwidth, config.app_latency.div_ceil(2))?; - - // Wait for the network to stabilize - tokio::time::sleep(Duration::from_millis(100)).await; - - let (output, _) = tokio::try_join!( - runner.exec_p.bench(BenchCmd { - config: config.clone(), - role: Role::Prover, - }), - runner.exec_v.bench(BenchCmd { - config: config.clone(), - role: Role::Verifier, - }) - )?; - - let BenchOutput::Prover { metrics } = output else { - panic!("expected prover output"); - }; + // Process benches in chunks. + let chunk_size = if runner.exec_p.is_browser() { + 20 + } else { + // Native benches are processed in one go.. + benches.len() + }; - let measurement = Measurement::new(config, metrics); + for (idx, bench_chunk) in benches.chunks(chunk_size).enumerate() { + let mut benches = Vec::with_capacity(benches.len()); + if !skip_warmup { + benches.extend(vec![WARM_UP_BENCH; 3]); + } + benches.extend_from_slice(bench_chunk); - writer.serialize(measurement)?; - writer.flush()?; + if idx > 0 && runner.exec_p.is_browser() { + // Reloading the page periodically to avoid an OOM panic. + // TODO: need to resolve this memory leak. + runner.exec_p.reload_browser_rpc().await?; + } + for config in benches { + runner + .network + .set_proto_config(config.bandwidth, config.protocol_latency.div_ceil(2))?; + runner + .network + .set_app_config(config.bandwidth, config.app_latency.div_ceil(2))?; + + // Wait for the network to stabilize + tokio::time::sleep(Duration::from_millis(100)).await; + + let (output, _) = tokio::try_join!( + runner.exec_p.bench(BenchCmd { + config: config.clone(), + role: Role::Prover, + }), + runner.exec_v.bench(BenchCmd { + config: config.clone(), + role: Role::Verifier, + }) + )?; + + let BenchOutput::Prover { metrics } = output else { + panic!("expected prover output"); + }; + + let measurement = Measurement::new(config, metrics); + + writer.serialize(measurement)?; + writer.flush()?; + } } } Command::Serve {} => { diff --git a/crates/harness/runner/src/rpc.rs b/crates/harness/runner/src/rpc.rs index 08609fccc6..ada9c401bf 100644 --- a/crates/harness/runner/src/rpc.rs +++ b/crates/harness/runner/src/rpc.rs @@ -102,6 +102,20 @@ impl Rpc { Err(e) => Err(e), }) } + + pub async fn shutdown(self) -> Result<()> { + match self.0 { + Inner::Native { mut io } => { + io.close().await.map_err(|e| RpcError::new(e.to_string()))? + } + Inner::Browser { page } => page + .close() + .await + .map_err(|e| RpcError::new(e.to_string()))?, + }; + + Ok(()) + } } async fn browser_cmd(page: &Page, cmd: Cmd) -> io::Result> { diff --git a/crates/harness/static/executor.js b/crates/harness/static/executor.js index bbc0c8d4d2..a44009ba26 100644 --- a/crates/harness/static/executor.js +++ b/crates/harness/static/executor.js @@ -10,7 +10,7 @@ class Executor { await initWasm(); console.log("wasm loaded"); console.log("initializing wasm"); - await wasm.initialize(undefined, navigator.hardwareConcurrency); + await wasm.initialize({ no_logging: true }, navigator.hardwareConcurrency); console.log("wasm initialized"); console.log("initializing executor"); this.executor = new wasm.WasmExecutor(config); From 3ebe12a9c53acd3b6a6ecbced51c2bfdb837d8a0 Mon Sep 17 00:00:00 2001 From: Hendrik Eeckhaut Date: Fri, 25 Jul 2025 15:15:08 +0200 Subject: [PATCH 2/2] clippy --- crates/harness/runner/src/executor.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/harness/runner/src/executor.rs b/crates/harness/runner/src/executor.rs index 7642e8aff8..591d49a31d 100644 --- a/crates/harness/runner/src/executor.rs +++ b/crates/harness/runner/src/executor.rs @@ -126,9 +126,11 @@ impl Executor { const TIMEOUT: usize = 10000; const DELAY: usize = 100; let mut retries = 0; - let mut config = HandlerConfig::default(); - // Bump the timeout for long-running benches. - config.request_timeout = Duration::from_secs(120); + let config = HandlerConfig { + // Bump the timeout for long-running benches. + request_timeout: Duration::from_secs(120), + ..Default::default() + }; let (browser, mut handler) = loop { match Browser::connect_with_config( @@ -159,7 +161,7 @@ impl Executor { // https://github.com/mattsse/chromiumoxide/issues/167 continue; } - eprintln!("chromium error: {:?}", e); + eprintln!("chromium error: {e:?}"); } } }); @@ -289,13 +291,13 @@ impl Executor { /// Creates a new RPC server for this browser executor. async fn new_browser_rpc(&self, browser: &Browser) -> Result { if !self.is_browser() { - return Err(anyhow!("executor target is not browser")); + return Err(anyhow!("executor target is not a browser")); } let (wasm_addr, wasm_port) = self.config.network().wasm; let page = browser - .new_page(&format!("http://{}:{}/index.html", wasm_addr, wasm_port)) + .new_page(&format!("http://{wasm_addr}:{wasm_port}/index.html")) .await?; page.execute(EnableParams::builder().build()).await?;