Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,38 @@ echo request1.bin | dumbpipe connect <ticket> --custom-alpn utf8:/iroh-bytes/2 >

if request1.bin contained a valid request for the `/iroh-bytes/2` protocol, response1.bin will
now contain the response.

## Custom Relay Configuration

By default, dumbpipe uses iroh's automatic relay selection, which picks the fastest responding relay server from the n0 network. You can customize this behavior using the `--relay` option:

### Disable relays entirely

If you want to force direct connections only:

```bash
dumbpipe listen --relay disabled
dumbpipe connect <ticket> --relay disabled
```

This will only attempt direct peer-to-peer connections and won't fall back to relay servers.

### Use default relays (default behavior)

```bash
dumbpipe listen --relay default
dumbpipe connect <ticket> --relay default
```

This is the default behavior when no `--relay` option is specified.

### Use a custom relay server

If you're running your own relay server or want to use a specific one:

```bash
dumbpipe listen --relay https://your-relay-server.com
dumbpipe connect <ticket> --relay https://your-relay-server.com
```

**Note**: Both the listener and connector should use the same relay configuration for optimal connectivity.
81 changes: 72 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Command line arguments.
use clap::{Parser, Subcommand};
use dumbpipe::NodeTicket;
use iroh::{endpoint::Connecting, Endpoint, NodeAddr, SecretKey, Watcher};
use iroh::{endpoint::Connecting, Endpoint, NodeAddr, RelayMode, RelayUrl, SecretKey, Watcher};
use n0_snafu::{Result, ResultExt};
use std::{
fmt::{Display, Formatter},
io,
net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs},
str::FromStr,
Expand Down Expand Up @@ -122,6 +123,14 @@ pub struct CommonArgs {
#[clap(long)]
pub custom_alpn: Option<String>,

/// The relay URL to use as a home relay,
///
/// Can be set to "disabled" to disable relay servers and "custom"
/// to configure custom servers. The default is the n0 quickest responding
/// relay if the flag is not set.
#[clap(long, default_value_t = RelayModeOption::Default)]
pub relay: RelayModeOption,
Comment on lines +126 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This help text needs a little tweaking I think.

You don't set this option to the literal value of "custom", but rather you give your relay url


/// The verbosity level. Repeat to increase verbosity.
#[clap(short = 'v', long, action = clap::ArgAction::Count)]
pub verbose: u8,
Expand All @@ -148,6 +157,49 @@ fn parse_alpn(alpn: &str) -> Result<Vec<u8>> {
})
}

/// Available command line options for configuring relays.
#[derive(Clone, Debug, PartialEq)]
pub enum RelayModeOption {
/// Disables relays altogether.
Disabled,
/// Uses the default relay servers.
Default,
/// Uses a single, custom relay server by URL.
Custom(RelayUrl),
}

impl FromStr for RelayModeOption {
type Err = iroh::RelayUrlParseError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"disabled" => Ok(Self::Disabled),
"default" => Ok(Self::Default),
_ => Ok(Self::Custom(RelayUrl::from_str(s)?)),
}
}
}

impl Display for RelayModeOption {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Disabled => f.write_str("disabled"),
Self::Default => f.write_str("default"),
Self::Custom(url) => url.fmt(f),
}
}
}

impl From<RelayModeOption> for RelayMode {
fn from(value: RelayModeOption) -> Self {
match value {
RelayModeOption::Disabled => RelayMode::Disabled,
RelayModeOption::Default => RelayMode::Default,
RelayModeOption::Custom(url) => RelayMode::Custom(url.into()),
}
}
}

#[derive(Parser, Debug)]
pub struct ListenArgs {
/// Immediately close our sending side, indicating that we will not transmit any data
Expand Down Expand Up @@ -291,7 +343,10 @@ async fn create_endpoint(
common: &CommonArgs,
alpns: Vec<Vec<u8>>,
) -> Result<Endpoint> {
let mut builder = Endpoint::builder().secret_key(secret_key).alpns(alpns);
let mut builder = Endpoint::builder()
.secret_key(secret_key)
.alpns(alpns)
.relay_mode(common.relay.clone().into());
if let Some(addr) = common.ipv4_addr {
builder = builder.bind_addr_v4(addr);
}
Expand Down Expand Up @@ -345,7 +400,9 @@ async fn listen_stdio(args: ListenArgs) -> Result<()> {
let secret_key = get_or_create_secret()?;
let endpoint = create_endpoint(secret_key, &args.common, vec![args.common.alpn()?]).await?;
// wait for the endpoint to figure out its address before making a ticket
endpoint.home_relay().initialized().await;
if args.common.relay != RelayModeOption::Disabled {
endpoint.home_relay().initialized().await;
}
let node = endpoint.node_addr().initialized().await;
let mut short = node.clone();
let ticket = NodeTicket::new(node);
Expand Down Expand Up @@ -444,8 +501,9 @@ async fn connect_tcp(args: ConnectTcpArgs) -> Result<()> {
tracing::info!("tcp listening on {:?}", addrs);

// Wait for our own endpoint to be ready before trying to connect.
endpoint.home_relay().initialized().await;

if args.common.relay != RelayModeOption::Disabled {
endpoint.home_relay().initialized().await;
}
let tcp_listener = match tokio::net::TcpListener::bind(addrs.as_slice()).await {
Ok(tcp_listener) => tcp_listener,
Err(cause) => {
Expand Down Expand Up @@ -517,7 +575,9 @@ async fn listen_tcp(args: ListenTcpArgs) -> Result<()> {
let secret_key = get_or_create_secret()?;
let endpoint = create_endpoint(secret_key, &args.common, vec![args.common.alpn()?]).await?;
// wait for the endpoint to figure out its address before making a ticket
endpoint.home_relay().initialized().await;
if args.common.relay != RelayModeOption::Disabled {
endpoint.home_relay().initialized().await;
}
let node_addr = endpoint.node_addr().initialized().await;
let mut short = node_addr.clone();
let ticket = NodeTicket::new(node_addr);
Expand Down Expand Up @@ -599,7 +659,9 @@ async fn listen_unix(args: ListenUnixArgs) -> Result<()> {
let secret_key = get_or_create_secret()?;
let endpoint = create_endpoint(secret_key, &args.common, vec![args.common.alpn()?]).await?;
// wait for the endpoint to figure out its address before making a ticket
endpoint.home_relay().initialized().await;
if args.common.relay != RelayModeOption::Disabled {
endpoint.home_relay().initialized().await;
}
let node_addr = endpoint.node_addr().initialized().await;
let mut short = node_addr.clone();
let ticket = NodeTicket::new(node_addr);
Expand Down Expand Up @@ -714,8 +776,9 @@ async fn connect_unix(args: ConnectUnixArgs) -> Result<()> {
tracing::info!("unix listening on {:?}", socket_path);

// Wait for our own endpoint to be ready before trying to connect.
endpoint.home_relay().initialized().await;

if args.common.relay != RelayModeOption::Disabled {
endpoint.home_relay().initialized().await;
}
// Remove existing socket file if it exists
if let Err(e) = tokio::fs::remove_file(&socket_path).await {
if e.kind() != io::ErrorKind::NotFound {
Expand Down
81 changes: 81 additions & 0 deletions tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,84 @@ mod unix_socket_tests {
connect_stderr_thread.join().ok();
}
}

#[test]
#[ignore = "flaky"]
fn connect_listen_relay_disabled() {
let listen_to_connect = b"hello from listen";
let connect_to_listen = b"hello from connect";
let mut listen = duct::cmd(dumbpipe_bin(), ["listen", "--relay", "disabled"])
.env_remove("RUST_LOG")
.stdin_bytes(listen_to_connect)
.stderr_to_stdout()
.reader()
.unwrap();
let header = read_ascii_lines(3, &mut listen).unwrap();
let header = String::from_utf8(header).unwrap();
let ticket = header.split_ascii_whitespace().last().unwrap();
let ticket = NodeTicket::from_str(ticket).unwrap();

let connect = duct::cmd(
dumbpipe_bin(),
["connect", &ticket.to_string(), "--relay", "disabled"],
)
.env_remove("RUST_LOG")
.stdin_bytes(connect_to_listen)
.stderr_null()
.stdout_capture()
.run()
.unwrap();

assert!(connect.status.success());
assert_eq!(&connect.stdout, listen_to_connect);

let mut listen_stdout = Vec::new();
listen.read_to_end(&mut listen_stdout).unwrap();
assert_eq!(&listen_stdout, connect_to_listen);
}

#[test]
#[ignore = "flaky"]
fn connect_listen_relay_default() {
let listen_to_connect = b"hello from listen";
let connect_to_listen = b"hello from connect";
let mut listen = duct::cmd(dumbpipe_bin(), ["listen", "--relay", "default"])
.env_remove("RUST_LOG")
.stdin_bytes(listen_to_connect)
.stderr_to_stdout()
.reader()
.unwrap();
let header = read_ascii_lines(3, &mut listen).unwrap();
let header = String::from_utf8(header).unwrap();
let ticket = header.split_ascii_whitespace().last().unwrap();
let ticket = NodeTicket::from_str(ticket).unwrap();

let connect = duct::cmd(
dumbpipe_bin(),
["connect", &ticket.to_string(), "--relay", "default"],
)
.env_remove("RUST_LOG")
.stdin_bytes(connect_to_listen)
.stderr_null()
.stdout_capture()
.run()
.unwrap();

assert!(connect.status.success());
assert_eq!(&connect.stdout, listen_to_connect);

let mut listen_stdout = Vec::new();
listen.read_to_end(&mut listen_stdout).unwrap();
assert_eq!(&listen_stdout, connect_to_listen);
}

#[test]
fn relay_option_invalid() {
let output = duct::cmd(dumbpipe_bin(), ["listen", "--relay", "invalid-relay-url"])
.env_remove("RUST_LOG")
.stderr_capture()
.stdout_capture()
.run();

assert!(output.is_err() || !output.unwrap().status.success());
}