Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 95 additions & 31 deletions crates/harness/runner/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use chromiumoxide::{
network::{EnableParams, SetCacheDisabledParams},
page::ReloadParams,
},
handler::HandlerConfig,
};
use futures::StreamExt;
use harness_core::{
Expand Down Expand Up @@ -73,7 +74,7 @@ impl Executor {
self.ns.name(),
"env",
format!("CONFIG={}", serde_json::to_string(&self.config)?),
executor_path
executor_path,
)
.stdout_capture()
.stderr_capture()
Expand All @@ -96,7 +97,6 @@ impl Executor {
Id::Zero => self.config.network().rpc_0,
Id::One => self.config.network().rpc_1,
};
let (wasm_addr, wasm_port) = self.config.network().wasm;

// Create a temporary directory for the browser profile.
let tmp = duct::cmd!("mktemp", "-d").read()?;
Expand Down Expand Up @@ -126,8 +126,18 @@ impl Executor {
const TIMEOUT: usize = 10000;
const DELAY: usize = 100;
let mut retries = 0;
let config = HandlerConfig {
// Bump the timeout for long-running benches.
request_timeout: Duration::from_secs(120),
..Default::default()
};

let (browser, mut handler) = loop {
match Browser::connect(format!("http://{}:{}", rpc_addr.0, PORT_BROWSER)).await
match Browser::connect_with_config(
format!("http://{}:{}", rpc_addr.0, PORT_BROWSER),
config.clone(),
)
.await
{
Ok(browser) => break browser,
Err(e) => {
Expand All @@ -143,39 +153,20 @@ impl Executor {
tokio::spawn(async move {
while let Some(res) = handler.next().await {
if let Err(e) = res {
if e.to_string()
== "data did not match any variant of untagged enum Message"
{
// Do not log this error. It appears to be
// caused by a bug upstream.
// https://github.com/mattsse/chromiumoxide/issues/167
continue;
}
eprintln!("chromium error: {e:?}");
}
}
});

let page = browser
.new_page(&format!("http://{wasm_addr}:{wasm_port}/index.html"))
.await?;

page.execute(EnableParams::builder().build()).await?;
page.execute(SetCacheDisabledParams {
cache_disabled: true,
})
.await?;
page.execute(ReloadParams::builder().ignore_cache(true).build())
.await?;
page.wait_for_navigation().await?;
page.bring_to_front().await?;
page.evaluate(format!(
r#"
(async () => {{
const config = JSON.parse('{config}');
console.log("initializing executor", config);
await window.executor.init(config);
console.log("executor initialized");
return;
}})();
"#,
config = serde_json::to_string(&self.config)?
))
.await?;

let rpc = Rpc::new_browser(page);
let rpc = self.new_browser_rpc(&browser).await?;

self.state = State::Started {
process,
Expand Down Expand Up @@ -267,6 +258,79 @@ impl Executor {
Ok(())
}
}

/// Reloads the RPC server associated with this browser executor.
pub async fn reload_browser_rpc(&mut self) -> Result<()> {
if !self.is_browser() {
return Err(anyhow!("executor target is not browser"));
}
let State::Started {
process,
rpc,
browser,
} = self.state.take()
else {
return Err(anyhow!("executor is not in the started state"));
};

rpc.shutdown().await?;

let browser = browser.expect("browser is set for browser target");

let rpc = self.new_browser_rpc(&browser).await?;

self.state = State::Started {
process,
rpc,
browser: Some(browser),
};

Ok(())
}

/// Creates a new RPC server for this browser executor.
async fn new_browser_rpc(&self, browser: &Browser) -> Result<Rpc> {
if !self.is_browser() {
return Err(anyhow!("executor target is not a browser"));
}

let (wasm_addr, wasm_port) = self.config.network().wasm;

let page = browser
.new_page(&format!("http://{wasm_addr}:{wasm_port}/index.html"))
.await?;

page.execute(EnableParams::builder().build()).await?;
page.execute(SetCacheDisabledParams {
cache_disabled: true,
})
.await?;
page.execute(ReloadParams::builder().ignore_cache(true).build())
.await?;
page.wait_for_navigation().await?;
page.bring_to_front().await?;
page.evaluate(format!(
r#"
(async () => {{
const config = JSON.parse('{config}');
console.log("initializing executor", config);
await window.executor.init(config);
console.log("executor initialized");
return;
}})();
"#,
config = serde_json::to_string(&self.config)?
))
.await?;

let rpc = Rpc::new_browser(page);

Ok(rpc)
}

pub fn is_browser(&self) -> bool {
self.target == Target::Browser
}
}

impl Drop for Executor {
Expand Down
83 changes: 50 additions & 33 deletions crates/harness/runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,46 +206,63 @@ pub async fn main() -> Result<()> {
let output_file = std::fs::File::create(output)?;
let mut writer = WriterBuilder::new().from_writer(output_file);

let mut benches = Vec::new();
if !skip_warmup {
benches.extend(vec![WARM_UP_BENCH; 3]);
}
benches.extend(items.to_benches(samples, samples_override));
let benches = items.to_benches(samples, samples_override);

runner.start_services().await?;
runner.exec_p.start().await?;
runner.exec_v.start().await?;

for config in benches {
runner
.network
.set_proto_config(config.bandwidth, config.protocol_latency.div_ceil(2))?;
runner
.network
.set_app_config(config.bandwidth, config.app_latency.div_ceil(2))?;

// Wait for the network to stabilize
tokio::time::sleep(Duration::from_millis(100)).await;

let (output, _) = tokio::try_join!(
runner.exec_p.bench(BenchCmd {
config: config.clone(),
role: Role::Prover,
}),
runner.exec_v.bench(BenchCmd {
config: config.clone(),
role: Role::Verifier,
})
)?;

let BenchOutput::Prover { metrics } = output else {
panic!("expected prover output");
};
// Process benches in chunks.
let chunk_size = if runner.exec_p.is_browser() {
20
} else {
// Native benches are processed in one go..
benches.len()
};

let measurement = Measurement::new(config, metrics);
for (idx, bench_chunk) in benches.chunks(chunk_size).enumerate() {
let mut benches = Vec::with_capacity(benches.len());
if !skip_warmup {
benches.extend(vec![WARM_UP_BENCH; 3]);
}
benches.extend_from_slice(bench_chunk);

writer.serialize(measurement)?;
writer.flush()?;
if idx > 0 && runner.exec_p.is_browser() {
// Reloading the page periodically to avoid an OOM panic.
// TODO: need to resolve this memory leak.
runner.exec_p.reload_browser_rpc().await?;
}
for config in benches {
runner
.network
.set_proto_config(config.bandwidth, config.protocol_latency.div_ceil(2))?;
runner
.network
.set_app_config(config.bandwidth, config.app_latency.div_ceil(2))?;

// Wait for the network to stabilize
tokio::time::sleep(Duration::from_millis(100)).await;

let (output, _) = tokio::try_join!(
runner.exec_p.bench(BenchCmd {
config: config.clone(),
role: Role::Prover,
}),
runner.exec_v.bench(BenchCmd {
config: config.clone(),
role: Role::Verifier,
})
)?;

let BenchOutput::Prover { metrics } = output else {
panic!("expected prover output");
};

let measurement = Measurement::new(config, metrics);

writer.serialize(measurement)?;
writer.flush()?;
}
}
}
Command::Serve {} => {
Expand Down
14 changes: 14 additions & 0 deletions crates/harness/runner/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ impl Rpc {
Err(e) => Err(e),
})
}

pub async fn shutdown(self) -> Result<()> {
match self.0 {
Inner::Native { mut io } => {
io.close().await.map_err(|e| RpcError::new(e.to_string()))?
}
Inner::Browser { page } => page
.close()
.await
.map_err(|e| RpcError::new(e.to_string()))?,
};

Ok(())
}
}

async fn browser_cmd(page: &Page, cmd: Cmd) -> io::Result<Result<CmdOutput>> {
Expand Down
2 changes: 1 addition & 1 deletion crates/harness/static/executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Executor {
await initWasm();
console.log("wasm loaded");
console.log("initializing wasm");
await wasm.initialize(undefined, navigator.hardwareConcurrency);
await wasm.initialize({ no_logging: true }, navigator.hardwareConcurrency);
console.log("wasm initialized");
console.log("initializing executor");
this.executor = new wasm.WasmExecutor(config);
Expand Down
Loading