diff --git a/Cargo.lock b/Cargo.lock index 206c733..4aa7b03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,9 +110,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.98" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" dependencies = [ "backtrace", ] diff --git a/README.md b/README.md index 5dd6db9..8ea6276 100644 --- a/README.md +++ b/README.md @@ -173,3 +173,38 @@ echo request1.bin | dumbpipe connect --custom-alpn utf8:/iroh-bytes/2 > if request1.bin contained a valid request for the `/iroh-bytes/2` protocol, response1.bin will now contain the response. + +## Custom Relay Configuration + +By default, dumbpipe uses iroh's automatic relay selection, which picks the fastest responding relay server from the n0 network. You can customize this behavior using the `--relay` option: + +### Disable relays entirely + +If you want to force direct connections only: + +```bash +dumbpipe listen --relay disabled +dumbpipe connect --relay disabled +``` + +This will only attempt direct peer-to-peer connections and won't fall back to relay servers. + +### Use default relays (default behavior) + +```bash +dumbpipe listen --relay default +dumbpipe connect --relay default +``` + +This is the default behavior when no `--relay` option is specified. + +### Use a custom relay server + +If you're running your own relay server or want to use a specific one: + +```bash +dumbpipe listen --relay https://your-relay-server.com +dumbpipe connect --relay https://your-relay-server.com +``` + +**Note**: Both the listener and connector should use the same relay configuration for optimal connectivity. diff --git a/src/main.rs b/src/main.rs index 9b7ada2..f75cc71 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ //! Command line arguments. use clap::{Parser, Subcommand}; use dumbpipe::NodeTicket; -use iroh::{endpoint::Connecting, Endpoint, NodeAddr, SecretKey, Watcher}; +use iroh::{endpoint::Connecting, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey, Watcher}; use n0_snafu::{Result, ResultExt}; use std::{ + fmt::{Display, Formatter}, io, net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, str::FromStr, @@ -122,6 +123,14 @@ pub struct CommonArgs { #[clap(long)] pub custom_alpn: Option, + /// The relay URL to use as a home relay, + /// + /// Can be set to "disabled" to disable relay servers and "custom" + /// to configure custom servers. The default is the n0 quickest responding + /// relay if the flag is not set. + #[clap(long, default_value_t = RelayModeOption::Default)] + pub relay: RelayModeOption, + /// The verbosity level. Repeat to increase verbosity. #[clap(short = 'v', long, action = clap::ArgAction::Count)] pub verbose: u8, @@ -148,6 +157,49 @@ fn parse_alpn(alpn: &str) -> Result> { }) } +/// Available command line options for configuring relays. +#[derive(Clone, Debug, PartialEq)] +pub enum RelayModeOption { + /// Disables relays altogether. + Disabled, + /// Uses the default relay servers. + Default, + /// Uses a single, custom relay server by URL. + Custom(RelayUrl), +} + +impl FromStr for RelayModeOption { + type Err = iroh::RelayUrlParseError; + + fn from_str(s: &str) -> Result { + match s { + "disabled" => Ok(Self::Disabled), + "default" => Ok(Self::Default), + _ => Ok(Self::Custom(RelayUrl::from_str(s)?)), + } + } +} + +impl Display for RelayModeOption { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Disabled => f.write_str("disabled"), + Self::Default => f.write_str("default"), + Self::Custom(url) => url.fmt(f), + } + } +} + +impl From for RelayMode { + fn from(value: RelayModeOption) -> Self { + match value { + RelayModeOption::Disabled => RelayMode::Disabled, + RelayModeOption::Default => RelayMode::Default, + RelayModeOption::Custom(url) => RelayMode::Custom(url.into()), + } + } +} + #[derive(Parser, Debug)] pub struct ListenArgs { /// Immediately close our sending side, indicating that we will not transmit any data @@ -291,7 +343,10 @@ async fn create_endpoint( common: &CommonArgs, alpns: Vec>, ) -> Result { - let mut builder = Endpoint::builder().secret_key(secret_key).alpns(alpns); + let mut builder = Endpoint::builder() + .secret_key(secret_key) + .alpns(alpns) + .relay_mode(common.relay.clone().into()); if let Some(addr) = common.ipv4_addr { builder = builder.bind_addr_v4(addr); } @@ -345,7 +400,9 @@ async fn listen_stdio(args: ListenArgs) -> Result<()> { let secret_key = get_or_create_secret()?; let endpoint = create_endpoint(secret_key, &args.common, vec![args.common.alpn()?]).await?; // wait for the endpoint to figure out its address before making a ticket - endpoint.home_relay().initialized().await; + if args.common.relay != RelayModeOption::Disabled { + endpoint.home_relay().initialized().await; + } let node = endpoint.node_addr().initialized().await; let mut short = node.clone(); let ticket = NodeTicket::new(node); @@ -444,8 +501,9 @@ async fn connect_tcp(args: ConnectTcpArgs) -> Result<()> { tracing::info!("tcp listening on {:?}", addrs); // Wait for our own endpoint to be ready before trying to connect. - endpoint.home_relay().initialized().await; - + if args.common.relay != RelayModeOption::Disabled { + endpoint.home_relay().initialized().await; + } let tcp_listener = match tokio::net::TcpListener::bind(addrs.as_slice()).await { Ok(tcp_listener) => tcp_listener, Err(cause) => { @@ -517,7 +575,9 @@ async fn listen_tcp(args: ListenTcpArgs) -> Result<()> { let secret_key = get_or_create_secret()?; let endpoint = create_endpoint(secret_key, &args.common, vec![args.common.alpn()?]).await?; // wait for the endpoint to figure out its address before making a ticket - endpoint.home_relay().initialized().await; + if args.common.relay != RelayModeOption::Disabled { + endpoint.home_relay().initialized().await; + } let node_addr = endpoint.node_addr().initialized().await; let mut short = node_addr.clone(); let ticket = NodeTicket::new(node_addr); @@ -599,7 +659,9 @@ async fn listen_unix(args: ListenUnixArgs) -> Result<()> { let secret_key = get_or_create_secret()?; let endpoint = create_endpoint(secret_key, &args.common, vec![args.common.alpn()?]).await?; // wait for the endpoint to figure out its address before making a ticket - endpoint.home_relay().initialized().await; + if args.common.relay != RelayModeOption::Disabled { + endpoint.home_relay().initialized().await; + } let node_addr = endpoint.node_addr().initialized().await; let mut short = node_addr.clone(); let ticket = NodeTicket::new(node_addr); @@ -714,8 +776,9 @@ async fn connect_unix(args: ConnectUnixArgs) -> Result<()> { tracing::info!("unix listening on {:?}", socket_path); // Wait for our own endpoint to be ready before trying to connect. - endpoint.home_relay().initialized().await; - + if args.common.relay != RelayModeOption::Disabled { + endpoint.home_relay().initialized().await; + } // Remove existing socket file if it exists if let Err(e) = tokio::fs::remove_file(&socket_path).await { if e.kind() != io::ErrorKind::NotFound { diff --git a/tests/cli.rs b/tests/cli.rs index 29fffb7..5e0fabb 100644 --- a/tests/cli.rs +++ b/tests/cli.rs @@ -488,3 +488,84 @@ mod unix_socket_tests { connect_stderr_thread.join().ok(); } } + +#[test] +#[ignore = "flaky"] +fn connect_listen_relay_disabled() { + let listen_to_connect = b"hello from listen"; + let connect_to_listen = b"hello from connect"; + let mut listen = duct::cmd(dumbpipe_bin(), ["listen", "--relay", "disabled"]) + .env_remove("RUST_LOG") + .stdin_bytes(listen_to_connect) + .stderr_to_stdout() + .reader() + .unwrap(); + let header = read_ascii_lines(3, &mut listen).unwrap(); + let header = String::from_utf8(header).unwrap(); + let ticket = header.split_ascii_whitespace().last().unwrap(); + let ticket = NodeTicket::from_str(ticket).unwrap(); + + let connect = duct::cmd( + dumbpipe_bin(), + ["connect", &ticket.to_string(), "--relay", "disabled"], + ) + .env_remove("RUST_LOG") + .stdin_bytes(connect_to_listen) + .stderr_null() + .stdout_capture() + .run() + .unwrap(); + + assert!(connect.status.success()); + assert_eq!(&connect.stdout, listen_to_connect); + + let mut listen_stdout = Vec::new(); + listen.read_to_end(&mut listen_stdout).unwrap(); + assert_eq!(&listen_stdout, connect_to_listen); +} + +#[test] +#[ignore = "flaky"] +fn connect_listen_relay_default() { + let listen_to_connect = b"hello from listen"; + let connect_to_listen = b"hello from connect"; + let mut listen = duct::cmd(dumbpipe_bin(), ["listen", "--relay", "default"]) + .env_remove("RUST_LOG") + .stdin_bytes(listen_to_connect) + .stderr_to_stdout() + .reader() + .unwrap(); + let header = read_ascii_lines(3, &mut listen).unwrap(); + let header = String::from_utf8(header).unwrap(); + let ticket = header.split_ascii_whitespace().last().unwrap(); + let ticket = NodeTicket::from_str(ticket).unwrap(); + + let connect = duct::cmd( + dumbpipe_bin(), + ["connect", &ticket.to_string(), "--relay", "default"], + ) + .env_remove("RUST_LOG") + .stdin_bytes(connect_to_listen) + .stderr_null() + .stdout_capture() + .run() + .unwrap(); + + assert!(connect.status.success()); + assert_eq!(&connect.stdout, listen_to_connect); + + let mut listen_stdout = Vec::new(); + listen.read_to_end(&mut listen_stdout).unwrap(); + assert_eq!(&listen_stdout, connect_to_listen); +} + +#[test] +fn relay_option_invalid() { + let output = duct::cmd(dumbpipe_bin(), ["listen", "--relay", "invalid-relay-url"]) + .env_remove("RUST_LOG") + .stderr_capture() + .stdout_capture() + .run(); + + assert!(output.is_err() || !output.unwrap().status.success()); +}