From 044387ca8753c41f358fbabe5a23edaa3349c384 Mon Sep 17 00:00:00 2001 From: unalkalkan Date: Mon, 17 Feb 2025 00:29:34 +0300 Subject: [PATCH 1/6] initial support for udp proxy via dumbpipe added. --- src/main.rs | 267 ++++++++++++++++++++++++++++++++++++++++++++++++- src/udpconn.rs | 192 +++++++++++++++++++++++++++++++++++ 2 files changed, 454 insertions(+), 5 deletions(-) create mode 100644 src/udpconn.rs diff --git a/src/main.rs b/src/main.rs index 68a2cf9..15beb37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,16 +6,15 @@ use iroh::{ endpoint::{get_remote_node_id, Connecting}, Endpoint, NodeAddr, SecretKey, }; +use quinn::Connection; use std::{ - io, - net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, - str::FromStr, + collections::HashMap, io, net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, str::FromStr, sync::Arc }; use tokio::{ - io::{AsyncRead, AsyncWrite, AsyncWriteExt}, - select, + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, net::UdpSocket, select }; use tokio_util::sync::CancellationToken; +mod udpconn; /// Create a dumb pipe between two machines, using an iroh magicsocket. /// @@ -54,6 +53,15 @@ pub enum Commands { /// connecting to a TCP socket for which you have to specify the host and port. ListenTcp(ListenTcpArgs), + /// Listen on a magicsocket and forward incoming connections to the specified + /// host and port. Every incoming bidi stream is forwarded to a new connection. + /// + /// Will print a node ticket on stderr that can be used to connect. + /// + /// As far as the magic socket is concerned, this is listening. But it is + /// connecting to a UDP socket for which you have to specify the host and port. + ListenUdp(ListenUdpArgs), + /// Connect to a magicsocket, open a bidi stream, and forward stdin/stdout. /// /// A node ticket is required to connect. @@ -67,6 +75,15 @@ pub enum Commands { /// As far as the magic socket is concerned, this is connecting. But it is /// listening on a TCP socket for which you have to specify the interface and port. ConnectTcp(ConnectTcpArgs), + + /// Connect to a magicsocket, open a bidi stream, and forward stdin/stdout + /// to it. + /// + /// A node ticket is required to connect. + /// + /// As far as the magic socket is concerned, this is connecting. But it is + /// listening on a UDP socket for which you have to specify the interface and port. + ConnectUdp(ConnectUdpArgs), } #[derive(Parser, Debug)] @@ -140,6 +157,15 @@ pub struct ListenTcpArgs { pub common: CommonArgs, } +#[derive(Parser, Debug)] +pub struct ListenUdpArgs { + #[clap(long)] + pub host: String, + + #[clap(flatten)] + pub common: CommonArgs, +} + #[derive(Parser, Debug)] pub struct ConnectTcpArgs { /// The addresses to listen on for incoming tcp connections. @@ -155,6 +181,21 @@ pub struct ConnectTcpArgs { pub common: CommonArgs, } +#[derive(Parser, Debug)] +pub struct ConnectUdpArgs { + /// The addresses to listen on for incoming udp connections. + /// + /// To listen on all network interfaces, use 0.0.0.0:12345 + #[clap(long)] + pub addr: String, + + /// The node to connect to + pub ticket: NodeTicket, + + #[clap(flatten)] + pub common: CommonArgs, +} + #[derive(Parser, Debug)] pub struct ConnectArgs { /// The node to connect to @@ -440,6 +481,126 @@ async fn connect_tcp(args: ConnectTcpArgs) -> anyhow::Result<()> { Ok(()) } +pub struct SplitUdpConn { + // TODO: Do we need to store this connection? + // Holding on to this for the future where we need to cleanup the resources. + connection: quinn::Connection, + send: quinn::SendStream, +} + +impl SplitUdpConn { + pub fn new(connection: quinn::Connection, send: quinn::SendStream) -> Self { + Self { + connection, + send + } + } +} + +// 1- Receives request message from socket +// 2- Forwards it to the quinn stream +// 3- Receives response message back from quinn stream +// 4- Forwards it back to the socket +async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { + let addrs = args + .addr + .to_socket_addrs() + .context(format!("invalid host string {}", args.addr))?; + let secret_key = get_or_create_secret()?; + let mut builder = Endpoint::builder().secret_key(secret_key).alpns(vec![]); + if let Some(addr) = args.common.magic_ipv4_addr { + builder = builder.bind_addr_v4(addr); + } + if let Some(addr) = args.common.magic_ipv6_addr { + builder = builder.bind_addr_v6(addr); + } + let endpoint = builder.bind().await.context("unable to bind magicsock")?; + tracing::info!("udp listening on {:?}", addrs); + let socket = Arc::new(UdpSocket::bind(addrs.as_slice()).await?); + + let node_addr = args.ticket.node_addr(); + let mut buf: Vec = vec![0u8; 65535]; + let mut conns = HashMap::::new(); + loop { + match socket.recv_from(&mut buf).await { + Ok((size, sock_addr)) => { + // Check if we already have a connection for this socket address + let connection = match conns.get_mut(&sock_addr) { + Some(conn) => conn, + None => { + // We need to finish the connection to be done or we should use something like promise because + // when the connection was getting established, it might receive another message. + let endpoint = endpoint.clone(); + let addr = node_addr.clone(); + let handshake = !args.common.is_custom_alpn(); + let alpn = args.common.alpn()?; + + let remote_node_id = addr.node_id; + tracing::info!("forwarding UDP to {}", remote_node_id); + + // connect to the node, try only once + let connection = endpoint + .connect(addr.clone(), &alpn) + .await + .context(format!("error connecting to {}", remote_node_id))?; + tracing::info!("connected to {}", remote_node_id); + + // open a bidi stream, try only once + let (mut send, recv) = connection + .open_bi() + .await + .context(format!("error opening bidi stream to {}", remote_node_id))?; + tracing::info!("opened bidi stream to {}", remote_node_id); + + // send the handshake unless we are using a custom alpn + if handshake { + send.write_all(&dumbpipe::HANDSHAKE).await?; + } + + let sock_send = socket.clone(); + // Spawn a task for listening the quinn connection, and forwarding the data to the UDP socket + tokio::spawn(async move { + // 3- Receives response message back from quinn stream + // 4- Forwards it back to the socket + if let Err(cause) = udpconn::handle_udp_accept(sock_addr, sock_send, recv ) + .await { + // log error at warn level + // + // we should know about it, but it's not fatal + tracing::warn!("error handling connection: {}", cause); + + // TODO: cleanup resources + } + }); + + // Create and store the split connection + let split_conn = SplitUdpConn::new(connection.clone(), send); + conns.insert(sock_addr, split_conn); + conns.get_mut(&sock_addr).expect("connection was just inserted") + } + }; + + tracing::info!("forward_udp_to_quinn: Received {} bytes from {}", size, sock_addr); + + // 1- Receives request message from socket + // 2- Forwards it to the quinn stream + if let Err(e) = connection.send.write_all(&buf[..size]).await { + tracing::error!("Error writing to Quinn stream: {}", e); + // TODO: Cleanup the resources on error. + // Remove the failed connection + // conns.remove(&sock_addr); + return Err(e.into()); + } + } + Err(e) => { + tracing::warn!("error receiving from UDP socket: {}", e); + break; + } + } + } + Ok(()) +} + /// Listen on a magicsocket and forward incoming connections to a tcp socket. async fn listen_tcp(args: ListenTcpArgs) -> anyhow::Result<()> { let addrs = match args.host.to_socket_addrs() { @@ -533,6 +694,100 @@ async fn listen_tcp(args: ListenTcpArgs) -> anyhow::Result<()> { Ok(()) } +/// Listen on a magicsocket and forward incoming connections to a udp socket. +async fn listen_udp(args: ListenUdpArgs) -> anyhow::Result<()> { + let addrs = match args.host.to_socket_addrs() { + Ok(addrs) => addrs.collect::>(), + Err(e) => anyhow::bail!("invalid host string {}: {}", args.host, e), + }; + let secret_key = get_or_create_secret()?; + let mut builder = Endpoint::builder() + .alpns(vec![args.common.alpn()?]) + .secret_key(secret_key); + if let Some(addr) = args.common.magic_ipv4_addr { + builder = builder.bind_addr_v4(addr); + } + if let Some(addr) = args.common.magic_ipv6_addr { + builder = builder.bind_addr_v6(addr); + } + let endpoint = builder.bind().await?; + // wait for the endpoint to figure out its address before making a ticket + endpoint.home_relay().initialized().await?; + let node_addr = endpoint.node_addr().await?; + let mut short = node_addr.clone(); + let ticket = NodeTicket::new(node_addr); + short.direct_addresses.clear(); + let short = NodeTicket::new(short); + + // print the ticket on stderr so it doesn't interfere with the data itself + // + // note that the tests rely on the ticket being the last thing printed + eprintln!("Forwarding incoming requests to '{}'.", args.host); + eprintln!("To connect, use e.g.:"); + eprintln!("dumbpipe connect-udp {ticket}"); + if args.common.verbose > 0 { + eprintln!("or:\ndumbpipe connect-udp {}", short); + } + tracing::info!("node id is {}", ticket.node_addr().node_id); + tracing::info!("derp url is {:?}", ticket.node_addr().relay_url); + + // handle a new incoming connection on the magic endpoint + async fn handle_magic_accept( + connecting: Connecting, + addrs: Vec, + handshake: bool, + ) -> anyhow::Result<()> { + let connection = connecting.await.context("error accepting connection")?; + let remote_node_id = get_remote_node_id(&connection)?; + tracing::info!("got connection from {}", remote_node_id); + let (s, mut r) = connection + .accept_bi() + .await + .context("error accepting stream")?; + tracing::info!("accepted bidi stream from {}", remote_node_id); + if handshake { + // read the handshake and verify it + let mut buf = [0u8; dumbpipe::HANDSHAKE.len()]; + r.read_exact(&mut buf).await?; + anyhow::ensure!(buf == dumbpipe::HANDSHAKE, "invalid handshake"); + } + + // 1- Receives request message from quinn stream + // 2- Forwards it to the (addrs) via UDP socket + // 3- Receives response message back from UDP socket + // 4- Forwards it back to the quinn stream + udpconn::handle_udp_listen(addrs.as_slice(), r, s).await?; + Ok(()) + } + + loop { + let incoming = select! { + incoming = endpoint.accept() => incoming, + _ = tokio::signal::ctrl_c() => { + eprintln!("got ctrl-c, exiting"); + break; + } + }; + let Some(incoming) = incoming else { + break; + }; + let Ok(connecting) = incoming.accept() else { + break; + }; + let addrs = addrs.clone(); + let handshake = !args.common.is_custom_alpn(); + tokio::spawn(async move { + if let Err(cause) = handle_magic_accept(connecting, addrs, handshake).await { + // log error at warn level + // + // we should know about it, but it's not fatal + tracing::warn!("error handling connection: {}", cause); + } + }); + } + Ok(()) +} + #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); @@ -540,8 +795,10 @@ async fn main() -> anyhow::Result<()> { let res = match args.command { Commands::Listen(args) => listen_stdio(args).await, Commands::ListenTcp(args) => listen_tcp(args).await, + Commands::ListenUdp(args) => listen_udp(args).await, Commands::Connect(args) => connect_stdio(args).await, Commands::ConnectTcp(args) => connect_tcp(args).await, + Commands::ConnectUdp(args) => connect_udp(args).await, }; match res { Ok(()) => std::process::exit(0), diff --git a/src/udpconn.rs b/src/udpconn.rs new file mode 100644 index 0000000..344ebcc --- /dev/null +++ b/src/udpconn.rs @@ -0,0 +1,192 @@ +use std::net::SocketAddr; +use tokio::net::UdpSocket; +use quinn::{RecvStream, SendStream}; +use anyhow::Result; +use tokio_util::sync::CancellationToken; + +use std::sync::Arc; + +pub(crate) async fn handle_udp_accept( + client_addr: SocketAddr, + udp_socket: Arc, + mut recv_stream: RecvStream, +) -> Result<()> { + // Create a cancellation token to coordinate shutdown + let token = CancellationToken::new(); + let token_quinn = token.clone(); + let token_ctrl_c = token.clone(); + + // Create buffer for receiving data + let udp_buf_size = 65535; // Maximum UDP packet size + let quinn_to_udp = { + let socket = udp_socket.clone(); + tokio::spawn(async move { + let mut buf = vec![0u8; udp_buf_size]; + loop { + // Check if we should stop + if token_quinn.is_cancelled() { + break; + } + + // Read from Quinn stream + match recv_stream.read(&mut buf).await { + Ok(Some(n)) => { + // Parse the prefixed message to get the address and the buff + // let (addr, buf) = read_prefixed_message(&buf[..n]).unwrap(); + tracing::info!("forward_udp_to_quinn: Received {} bytes from quinn stream.", n); + + // Forward to UDP peer + tracing::info!("Parsed packet from quinn stream. Forwarding to {}", client_addr); + if let Err(e) = socket.send_to(&buf[..n], client_addr).await { + eprintln!("Error sending to UDP: {}", e); + token_quinn.cancel(); + break; + } + } + Ok(None) => { + // Quinn stream ended normally + token_quinn.cancel(); + break; + } + Err(e) => { + eprintln!("Quinn receive error: {}", e); + token_quinn.cancel(); + break; + } + } + } + }) + }; + + // Handle Ctrl+C signal + let ctrl_c = tokio::spawn(async move { + if let Ok(()) = tokio::signal::ctrl_c().await { + token_ctrl_c.cancel(); + } + }); + + // Wait for any task to complete (or Ctrl+C) + tokio::select! { + // _ = udp_to_quinn => {}, + _ = quinn_to_udp => {}, + _ = ctrl_c => {}, + } + + Ok(()) +} + +// Every new connection is a new socket to the `connect udp` command +pub(crate) async fn handle_udp_listen( + peer_addrs: &[SocketAddr], + mut recv_stream: RecvStream, + mut send_stream: SendStream, +) -> Result<()> { + // Create a cancellation token to coordinate shutdown + let token = CancellationToken::new(); + let token_udp = token.clone(); + let token_quinn = token.clone(); + let token_ctrl_c = token.clone(); + + // Create a new socket for this connection, representing the client connected to UDP server at the other side. + // This socket will be used to send data to the actual server, receive response back and forward it to the conn. + let socket = Arc::new(UdpSocket::bind("0.0.0:0").await?); + + let udp_buf_size = 65535; // Maximum UDP packet size + let quinn_to_udp = { + let socket_send = socket.clone(); + let p_addr = peer_addrs.to_vec(); + tokio::spawn(async move { + let mut buf = vec![0u8; udp_buf_size]; + loop { + // Check if we should stop + if token_quinn.is_cancelled() { + tracing::info!("Token cancellation was requested. Ending QUIC to UDP task."); + break; + } + + // Read from Quinn stream + match recv_stream.read(&mut buf).await { + Ok(Some(n)) => { + tracing::info!("forward_quinn_to_udp: Received {} bytes from quinn stream.", n); + + // Forward to UDP peer + // tracing::info!("Forwarding packets to {:?}", peer_addrs); + for addr in p_addr.iter() { + if let Err(e) = socket_send.send_to(&buf[..n], addr).await { + eprintln!("Error sending to UDP: {}", e); + token_quinn.cancel(); + break; + } + } + } + Ok(None) => { + // Quinn stream ended normally + token_quinn.cancel(); + break; + } + Err(e) => { + eprintln!("Quinn receive error: {}", e); + token_quinn.cancel(); + break; + } + } + } + tracing::info!("Token cancellation was requested or error received. quinn connection task ended."); + }) + }; + + let udp_to_quinn = { + // Task for listening to the response to the UDP server + let socket_listen = socket.clone(); + tokio::spawn(async move { + let mut buf = vec![0u8; udp_buf_size]; + loop { + // Check if we should stop + if token_udp.is_cancelled() { + tracing::info!("Token cancellation was requested. Ending UDP to QUIC task."); + break; + } + + // Use timeout to periodically check cancellation + match tokio::time::timeout( + tokio::time::Duration::from_millis(100), + socket_listen.recv_from(&mut buf) + ).await { + Ok(Ok((n, _addr))) => { + tracing::info!("forward_quinn_to_udp: Received {} bytes from server", n); + + // Forward the buf back to the quinn stream + if let Err(e) = send_stream.write_all(&buf[..n]).await { + eprintln!("Error writing to Quinn stream: {}", e); + token_udp.cancel(); + break; + } + } + Ok(Err(e)) => { + eprintln!("UDP receive error: {}", e); + token_udp.cancel(); + break; + } + Err(_) => continue, // Timeout, check cancellation + } + } + tracing::info!("Token cancellation was requested or error received. UDP socket task ended."); + }) + }; + + // Handle Ctrl+C signal + let ctrl_c = tokio::spawn(async move { + if let Ok(()) = tokio::signal::ctrl_c().await { + token_ctrl_c.cancel(); + } + }); + + // Wait for any task to complete (or Ctrl+C) + tokio::select! { + _ = quinn_to_udp => {}, + _ = udp_to_quinn => {}, + _ = ctrl_c => {}, + } + + Ok(()) +} From b72a9f89797db347f82d6323bf5d000882ea329b Mon Sep 17 00:00:00 2001 From: unalkalkan Date: Tue, 18 Feb 2025 01:18:08 +0300 Subject: [PATCH 2/6] using connection datagram instead of quinn stream implemented. --- src/main.rs | 57 ++++++++---------------------- src/udpconn.rs | 94 +++++++++++++++++++++----------------------------- 2 files changed, 54 insertions(+), 97 deletions(-) diff --git a/src/main.rs b/src/main.rs index 15beb37..d0b5064 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ //! Command line arguments. use anyhow::Context; +use bytes::Bytes; use clap::{Parser, Subcommand}; use dumbpipe::NodeTicket; use iroh::{ @@ -481,22 +482,6 @@ async fn connect_tcp(args: ConnectTcpArgs) -> anyhow::Result<()> { Ok(()) } -pub struct SplitUdpConn { - // TODO: Do we need to store this connection? - // Holding on to this for the future where we need to cleanup the resources. - connection: quinn::Connection, - send: quinn::SendStream, -} - -impl SplitUdpConn { - pub fn new(connection: quinn::Connection, send: quinn::SendStream) -> Self { - Self { - connection, - send - } - } -} - // 1- Receives request message from socket // 2- Forwards it to the quinn stream // 3- Receives response message back from quinn stream @@ -520,7 +505,7 @@ async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { let node_addr = args.ticket.node_addr(); let mut buf: Vec = vec![0u8; 65535]; - let mut conns = HashMap::::new(); + let mut conns = HashMap::::new(); loop { match socket.recv_from(&mut buf).await { Ok((size, sock_addr)) => { @@ -545,25 +530,18 @@ async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { .context(format!("error connecting to {}", remote_node_id))?; tracing::info!("connected to {}", remote_node_id); - // open a bidi stream, try only once - let (mut send, recv) = connection - .open_bi() - .await - .context(format!("error opening bidi stream to {}", remote_node_id))?; - tracing::info!("opened bidi stream to {}", remote_node_id); - // send the handshake unless we are using a custom alpn if handshake { - send.write_all(&dumbpipe::HANDSHAKE).await?; + connection.send_datagram(Bytes::from_static(&dumbpipe::HANDSHAKE))?; } let sock_send = socket.clone(); + let conn_clone = connection.clone(); // Spawn a task for listening the quinn connection, and forwarding the data to the UDP socket tokio::spawn(async move { - // 3- Receives response message back from quinn stream + // 3- Receives response message back from connection datagram // 4- Forwards it back to the socket - if let Err(cause) = udpconn::handle_udp_accept(sock_addr, sock_send, recv ) - .await { + if let Err(cause) = udpconn::handle_udp_accept(sock_addr, sock_send, conn_clone).await { // log error at warn level // // we should know about it, but it's not fatal @@ -574,18 +552,17 @@ async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { }); // Create and store the split connection - let split_conn = SplitUdpConn::new(connection.clone(), send); - conns.insert(sock_addr, split_conn); + conns.insert(sock_addr, connection); conns.get_mut(&sock_addr).expect("connection was just inserted") } }; - tracing::info!("forward_udp_to_quinn: Received {} bytes from {}", size, sock_addr); + tracing::info!("connect_udp: Received {} bytes from {}", size, sock_addr); // 1- Receives request message from socket - // 2- Forwards it to the quinn stream - if let Err(e) = connection.send.write_all(&buf[..size]).await { - tracing::error!("Error writing to Quinn stream: {}", e); + // 2- Forwards it to the connection datagram + if let Err(e) = connection.send_datagram(Bytes::copy_from_slice(&buf[..size])) { // Bytes::copy_from_slice probably isn't the best way to do it. Investigate. + tracing::error!("Error writing to connection datagram: {}", e); // TODO: Cleanup the resources on error. // Remove the failed connection // conns.remove(&sock_addr); @@ -740,23 +717,17 @@ async fn listen_udp(args: ListenUdpArgs) -> anyhow::Result<()> { let connection = connecting.await.context("error accepting connection")?; let remote_node_id = get_remote_node_id(&connection)?; tracing::info!("got connection from {}", remote_node_id); - let (s, mut r) = connection - .accept_bi() - .await - .context("error accepting stream")?; - tracing::info!("accepted bidi stream from {}", remote_node_id); if handshake { // read the handshake and verify it - let mut buf = [0u8; dumbpipe::HANDSHAKE.len()]; - r.read_exact(&mut buf).await?; - anyhow::ensure!(buf == dumbpipe::HANDSHAKE, "invalid handshake"); + let bytes = connection.read_datagram().await?; + anyhow::ensure!(*bytes == dumbpipe::HANDSHAKE, "invalid handshake"); } // 1- Receives request message from quinn stream // 2- Forwards it to the (addrs) via UDP socket // 3- Receives response message back from UDP socket // 4- Forwards it back to the quinn stream - udpconn::handle_udp_listen(addrs.as_slice(), r, s).await?; + udpconn::handle_udp_listen(addrs.as_slice(), connection).await?; Ok(()) } diff --git a/src/udpconn.rs b/src/udpconn.rs index 344ebcc..58b4ef7 100644 --- a/src/udpconn.rs +++ b/src/udpconn.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; +use bytes::Bytes; use tokio::net::UdpSocket; -use quinn::{RecvStream, SendStream}; +use quinn::Connection; use anyhow::Result; use tokio_util::sync::CancellationToken; @@ -9,48 +10,39 @@ use std::sync::Arc; pub(crate) async fn handle_udp_accept( client_addr: SocketAddr, udp_socket: Arc, - mut recv_stream: RecvStream, + connection: Connection, ) -> Result<()> { // Create a cancellation token to coordinate shutdown let token = CancellationToken::new(); - let token_quinn = token.clone(); + let token_conn = token.clone(); let token_ctrl_c = token.clone(); // Create buffer for receiving data - let udp_buf_size = 65535; // Maximum UDP packet size - let quinn_to_udp = { + let connection_to_udp = { let socket = udp_socket.clone(); tokio::spawn(async move { - let mut buf = vec![0u8; udp_buf_size]; loop { // Check if we should stop - if token_quinn.is_cancelled() { + if token_conn.is_cancelled() { break; } - // Read from Quinn stream - match recv_stream.read(&mut buf).await { - Ok(Some(n)) => { - // Parse the prefixed message to get the address and the buff - // let (addr, buf) = read_prefixed_message(&buf[..n]).unwrap(); - tracing::info!("forward_udp_to_quinn: Received {} bytes from quinn stream.", n); - + // Read from connection datagram + match connection.read_datagram().await { + Ok(bytes) => { + let n = bytes.len(); // TODO: remove this line + tracing::info!("read datagram from connection with {} bytes. Forwarding to {}", n, client_addr); + // Forward to UDP peer - tracing::info!("Parsed packet from quinn stream. Forwarding to {}", client_addr); - if let Err(e) = socket.send_to(&buf[..n], client_addr).await { + if let Err(e) = socket.send_to(&bytes, client_addr).await { eprintln!("Error sending to UDP: {}", e); - token_quinn.cancel(); + token_conn.cancel(); break; } } - Ok(None) => { - // Quinn stream ended normally - token_quinn.cancel(); - break; - } Err(e) => { - eprintln!("Quinn receive error: {}", e); - token_quinn.cancel(); + eprintln!("Connection read_datagram error: {}", e); + token_conn.cancel(); break; } } @@ -67,8 +59,7 @@ pub(crate) async fn handle_udp_accept( // Wait for any task to complete (or Ctrl+C) tokio::select! { - // _ = udp_to_quinn => {}, - _ = quinn_to_udp => {}, + _ = connection_to_udp => {}, _ = ctrl_c => {}, } @@ -78,13 +69,12 @@ pub(crate) async fn handle_udp_accept( // Every new connection is a new socket to the `connect udp` command pub(crate) async fn handle_udp_listen( peer_addrs: &[SocketAddr], - mut recv_stream: RecvStream, - mut send_stream: SendStream, + connection: Connection, ) -> Result<()> { // Create a cancellation token to coordinate shutdown let token = CancellationToken::new(); let token_udp = token.clone(); - let token_quinn = token.clone(); + let token_conn = token.clone(); let token_ctrl_c = token.clone(); // Create a new socket for this connection, representing the client connected to UDP server at the other side. @@ -92,52 +82,48 @@ pub(crate) async fn handle_udp_listen( let socket = Arc::new(UdpSocket::bind("0.0.0:0").await?); let udp_buf_size = 65535; // Maximum UDP packet size - let quinn_to_udp = { + let conn_to_udp = { let socket_send = socket.clone(); let p_addr = peer_addrs.to_vec(); + let conn_clone = connection.clone(); tokio::spawn(async move { - let mut buf = vec![0u8; udp_buf_size]; loop { // Check if we should stop - if token_quinn.is_cancelled() { + if token_conn.is_cancelled() { tracing::info!("Token cancellation was requested. Ending QUIC to UDP task."); break; } - // Read from Quinn stream - match recv_stream.read(&mut buf).await { - Ok(Some(n)) => { - tracing::info!("forward_quinn_to_udp: Received {} bytes from quinn stream.", n); + // Read from connection datagram + match conn_clone.read_datagram().await { + Ok(bytes) => { + let n = bytes.len(); // TODO: remove this line + tracing::info!("conn_to_udp: Received {} bytes from datagram stream.", n); // Forward to UDP peer - // tracing::info!("Forwarding packets to {:?}", peer_addrs); for addr in p_addr.iter() { - if let Err(e) = socket_send.send_to(&buf[..n], addr).await { + if let Err(e) = socket_send.send_to(&bytes, addr).await { eprintln!("Error sending to UDP: {}", e); - token_quinn.cancel(); + token_conn.cancel(); break; } } } - Ok(None) => { - // Quinn stream ended normally - token_quinn.cancel(); - break; - } Err(e) => { - eprintln!("Quinn receive error: {}", e); - token_quinn.cancel(); + eprintln!("Connection read_datagram error: {}", e); + token_conn.cancel(); break; } } } - tracing::info!("Token cancellation was requested or error received. quinn connection task ended."); + tracing::info!("Token cancellation was requested or error received. connection datagram task ended."); }) }; - let udp_to_quinn = { + let udp_to_conn = { // Task for listening to the response to the UDP server let socket_listen = socket.clone(); + let conn_clone = connection.clone(); tokio::spawn(async move { let mut buf = vec![0u8; udp_buf_size]; loop { @@ -153,11 +139,11 @@ pub(crate) async fn handle_udp_listen( socket_listen.recv_from(&mut buf) ).await { Ok(Ok((n, _addr))) => { - tracing::info!("forward_quinn_to_udp: Received {} bytes from server", n); + tracing::info!("udp_to_conn: Received {} bytes from server", n); - // Forward the buf back to the quinn stream - if let Err(e) = send_stream.write_all(&buf[..n]).await { - eprintln!("Error writing to Quinn stream: {}", e); + // Forward the buf back to the connection datagram + if let Err(e) = conn_clone.send_datagram(Bytes::copy_from_slice(&buf[..n])) { + eprintln!("Error on connection send_datagram: {}", e); token_udp.cancel(); break; } @@ -183,8 +169,8 @@ pub(crate) async fn handle_udp_listen( // Wait for any task to complete (or Ctrl+C) tokio::select! { - _ = quinn_to_udp => {}, - _ = udp_to_quinn => {}, + _ = conn_to_udp => {}, + _ = udp_to_conn => {}, _ = ctrl_c => {}, } From 593b02222817c0c2aec062853dd8bf7936959423 Mon Sep 17 00:00:00 2001 From: unalkalkan Date: Tue, 18 Feb 2025 01:20:38 +0300 Subject: [PATCH 3/6] Bytes dependency added to dumbpipe since it's used directly now --- Cargo.lock | 5 +++-- Cargo.toml | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca9bbc9..d8e0621 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -286,9 +286,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" [[package]] name = "cc" @@ -732,6 +732,7 @@ name = "dumbpipe" version = "0.23.0" dependencies = [ "anyhow", + "bytes", "clap", "duct", "hex", diff --git a/Cargo.toml b/Cargo.toml index 8f77204..260e0da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ rust-version = "1.81" [dependencies] anyhow = "1.0.75" +bytes = "1.10.0" clap = { version = "4.4.10", features = ["derive"] } hex = "0.4.3" iroh = { version = "0.31", default-features = false } From e5f59e765e057b57cc358d0e59773c92fe6b7cb8 Mon Sep 17 00:00:00 2001 From: unalkalkan Date: Sat, 22 Feb 2025 01:20:40 +0300 Subject: [PATCH 4/6] changes for keeping track of connections and removing them using arc mutex. CTRL-C signal listening added to connect_udp. --- src/main.rs | 158 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 90 insertions(+), 68 deletions(-) diff --git a/src/main.rs b/src/main.rs index d0b5064..5768d34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::{ collections::HashMap, io, net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, str::FromStr, sync::Arc }; use tokio::{ - io::{AsyncRead, AsyncWrite, AsyncWriteExt}, net::UdpSocket, select + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, net::UdpSocket, select, signal }; use tokio_util::sync::CancellationToken; mod udpconn; @@ -56,9 +56,9 @@ pub enum Commands { /// Listen on a magicsocket and forward incoming connections to the specified /// host and port. Every incoming bidi stream is forwarded to a new connection. - /// + /// /// Will print a node ticket on stderr that can be used to connect. - /// + /// /// As far as the magic socket is concerned, this is listening. But it is /// connecting to a UDP socket for which you have to specify the host and port. ListenUdp(ListenUdpArgs), @@ -79,9 +79,9 @@ pub enum Commands { /// Connect to a magicsocket, open a bidi stream, and forward stdin/stdout /// to it. - /// + /// /// A node ticket is required to connect. - /// + /// /// As far as the magic socket is concerned, this is connecting. But it is /// listening on a UDP socket for which you have to specify the interface and port. ConnectUdp(ConnectUdpArgs), @@ -505,74 +505,96 @@ async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { let node_addr = args.ticket.node_addr(); let mut buf: Vec = vec![0u8; 65535]; - let mut conns = HashMap::::new(); + let conns = Arc::new(tokio::sync::Mutex::new( + HashMap::::new(), + )); loop { - match socket.recv_from(&mut buf).await { - Ok((size, sock_addr)) => { - // Check if we already have a connection for this socket address - let connection = match conns.get_mut(&sock_addr) { - Some(conn) => conn, - None => { - // We need to finish the connection to be done or we should use something like promise because - // when the connection was getting established, it might receive another message. - let endpoint = endpoint.clone(); - let addr = node_addr.clone(); - let handshake = !args.common.is_custom_alpn(); - let alpn = args.common.alpn()?; - - let remote_node_id = addr.node_id; - tracing::info!("forwarding UDP to {}", remote_node_id); - - // connect to the node, try only once - let connection = endpoint - .connect(addr.clone(), &alpn) - .await - .context(format!("error connecting to {}", remote_node_id))?; - tracing::info!("connected to {}", remote_node_id); - - // send the handshake unless we are using a custom alpn - if handshake { - connection.send_datagram(Bytes::from_static(&dumbpipe::HANDSHAKE))?; - } - - let sock_send = socket.clone(); - let conn_clone = connection.clone(); - // Spawn a task for listening the quinn connection, and forwarding the data to the UDP socket - tokio::spawn(async move { - // 3- Receives response message back from connection datagram - // 4- Forwards it back to the socket - if let Err(cause) = udpconn::handle_udp_accept(sock_addr, sock_send, conn_clone).await { - // log error at warn level - // - // we should know about it, but it's not fatal - tracing::warn!("error handling connection: {}", cause); - - // TODO: cleanup resources + tokio::select! { + _ = signal::ctrl_c() => { + tracing::info!("Received CTRL-C, shutting down..."); + break; + } + result = socket.recv_from(&mut buf) => { + match result { + Ok((size, sock_addr)) => { + // Check if we already have a connection for this socket address + let mut cnns = conns.lock().await; + let connection = match cnns.get_mut(&sock_addr) { + Some(conn) => conn, + None => { + // If we don't have a connection, drop the previous lock to create a new one later on + drop(cnns); + + // Create a new connection since this address is not in the hashmap + let endpoint = endpoint.clone(); + let addr = node_addr.clone(); + let handshake = !args.common.is_custom_alpn(); + let alpn = args.common.alpn()?; + + let remote_node_id = addr.node_id; + tracing::info!("creating a connection to be forwarding UDP to {}", remote_node_id); + + // connect to the node, try only once + let connection = endpoint + .connect(addr.clone(), &alpn) + .await + .context(format!("error connecting to {}", remote_node_id))?; + tracing::info!("connected to {}", remote_node_id); + + // send the handshake unless we are using a custom alpn + if handshake { + connection.send_datagram(Bytes::from_static(&dumbpipe::HANDSHAKE))?; + } + + let sock_send = socket.clone(); + let conn_clone = connection.clone(); + let conns_clone = conns.clone(); + // Spawn a task for listening the quinn connection, and forwarding the data to the UDP socket + tokio::spawn(async move { + tracing::info!("Spawned a accept task for {}", sock_addr); + // 3- Receives response message back from connection datagram + // 4- Forwards it back to the socket + if let Err(cause) = udpconn::handle_udp_accept(sock_addr, sock_send, conn_clone).await { + // log error at warn level + // + // we should know about it, but it's not fatal + tracing::warn!("error handling connection: {}", cause); + } + tracing::info!("Closing UDP connection for {}", sock_addr); + let mut cn = conns_clone.lock().await; + cn.remove(&sock_addr); + tracing::info!("Connection {} removed from hashmap.", sock_addr); + }); + + // Store the connection + let mut cn = conns.lock().await; + cn.insert(sock_addr, connection.clone()); + + tracing::info!("Connection stored for {}, returning.", sock_addr); + + // return + &mut connection.clone() } - }); - - // Create and store the split connection - conns.insert(sock_addr, connection); - conns.get_mut(&sock_addr).expect("connection was just inserted") + }; + + tracing::info!("connect_udp: Received {} bytes from {}", size, sock_addr); + + // 1- Receives request message from socket + // 2- Forwards it to the connection datagram + if let Err(e) = connection.send_datagram(Bytes::copy_from_slice(&buf[..size])) { // Bytes::copy_from_slice probably isn't the best way to do it. Investigate. + tracing::error!("Error writing to connection datagram: {}", e); + // TODO: Cleanup the resources on error. + // Remove the failed connection + // conns.remove(&sock_addr); + return Err(e.into()); + } + } + Err(e) => { + tracing::warn!("error receiving from UDP socket: {}", e); + break; } - }; - - tracing::info!("connect_udp: Received {} bytes from {}", size, sock_addr); - - // 1- Receives request message from socket - // 2- Forwards it to the connection datagram - if let Err(e) = connection.send_datagram(Bytes::copy_from_slice(&buf[..size])) { // Bytes::copy_from_slice probably isn't the best way to do it. Investigate. - tracing::error!("Error writing to connection datagram: {}", e); - // TODO: Cleanup the resources on error. - // Remove the failed connection - // conns.remove(&sock_addr); - return Err(e.into()); } } - Err(e) => { - tracing::warn!("error receiving from UDP socket: {}", e); - break; - } } } Ok(()) From f8aafecfcfb1a7792b7b512ae426498b253d061b Mon Sep 17 00:00:00 2001 From: unalkalkan Date: Sat, 22 Feb 2025 01:30:56 +0300 Subject: [PATCH 5/6] frequent debug tracing logs removed --- src/main.rs | 23 ++++++----------------- src/udpconn.rs | 37 ++++++++++++++++--------------------- 2 files changed, 22 insertions(+), 38 deletions(-) diff --git a/src/main.rs b/src/main.rs index 5768d34..9b9356d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -483,8 +483,8 @@ async fn connect_tcp(args: ConnectTcpArgs) -> anyhow::Result<()> { } // 1- Receives request message from socket -// 2- Forwards it to the quinn stream -// 3- Receives response message back from quinn stream +// 2- Forwards it to the connection datagram +// 3- Receives response message back from connection datagram // 4- Forwards it back to the socket async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { let addrs = args @@ -549,9 +549,8 @@ async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { let sock_send = socket.clone(); let conn_clone = connection.clone(); let conns_clone = conns.clone(); - // Spawn a task for listening the quinn connection, and forwarding the data to the UDP socket + // Spawn a task for listening the connection datagram, and forward the data to the UDP socket tokio::spawn(async move { - tracing::info!("Spawned a accept task for {}", sock_addr); // 3- Receives response message back from connection datagram // 4- Forwards it back to the socket if let Err(cause) = udpconn::handle_udp_accept(sock_addr, sock_send, conn_clone).await { @@ -560,32 +559,22 @@ async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { // we should know about it, but it's not fatal tracing::warn!("error handling connection: {}", cause); } - tracing::info!("Closing UDP connection for {}", sock_addr); + // Cleanup resources for this connection since it's `Connection` is closed or errored out let mut cn = conns_clone.lock().await; cn.remove(&sock_addr); - tracing::info!("Connection {} removed from hashmap.", sock_addr); }); - // Store the connection + // Store the connection and return let mut cn = conns.lock().await; cn.insert(sock_addr, connection.clone()); - - tracing::info!("Connection stored for {}, returning.", sock_addr); - - // return &mut connection.clone() } }; - tracing::info!("connect_udp: Received {} bytes from {}", size, sock_addr); - // 1- Receives request message from socket // 2- Forwards it to the connection datagram - if let Err(e) = connection.send_datagram(Bytes::copy_from_slice(&buf[..size])) { // Bytes::copy_from_slice probably isn't the best way to do it. Investigate. + if let Err(e) = connection.send_datagram(Bytes::copy_from_slice(&buf[..size])) { // Is Bytes::copy_from_slice most efficient way to do this?. Investigate. tracing::error!("Error writing to connection datagram: {}", e); - // TODO: Cleanup the resources on error. - // Remove the failed connection - // conns.remove(&sock_addr); return Err(e.into()); } } diff --git a/src/udpconn.rs b/src/udpconn.rs index 58b4ef7..7479d30 100644 --- a/src/udpconn.rs +++ b/src/udpconn.rs @@ -1,8 +1,8 @@ -use std::net::SocketAddr; +use anyhow::Result; use bytes::Bytes; -use tokio::net::UdpSocket; use quinn::Connection; -use anyhow::Result; +use std::net::SocketAddr; +use tokio::net::UdpSocket; use tokio_util::sync::CancellationToken; use std::sync::Arc; @@ -30,18 +30,15 @@ pub(crate) async fn handle_udp_accept( // Read from connection datagram match connection.read_datagram().await { Ok(bytes) => { - let n = bytes.len(); // TODO: remove this line - tracing::info!("read datagram from connection with {} bytes. Forwarding to {}", n, client_addr); - // Forward to UDP peer if let Err(e) = socket.send_to(&bytes, client_addr).await { - eprintln!("Error sending to UDP: {}", e); + tracing::error!("Error sending to UDP: {}", e); token_conn.cancel(); break; } } Err(e) => { - eprintln!("Connection read_datagram error: {}", e); + tracing::error!("Connection read_datagram error: {}", e); token_conn.cancel(); break; } @@ -97,20 +94,17 @@ pub(crate) async fn handle_udp_listen( // Read from connection datagram match conn_clone.read_datagram().await { Ok(bytes) => { - let n = bytes.len(); // TODO: remove this line - tracing::info!("conn_to_udp: Received {} bytes from datagram stream.", n); - // Forward to UDP peer for addr in p_addr.iter() { if let Err(e) = socket_send.send_to(&bytes, addr).await { - eprintln!("Error sending to UDP: {}", e); + tracing::error!("Error sending to UDP: {}", e); token_conn.cancel(); break; } } } Err(e) => { - eprintln!("Connection read_datagram error: {}", e); + tracing::error!("Connection read_datagram error: {}", e); token_conn.cancel(); break; } @@ -119,7 +113,7 @@ pub(crate) async fn handle_udp_listen( tracing::info!("Token cancellation was requested or error received. connection datagram task ended."); }) }; - + let udp_to_conn = { // Task for listening to the response to the UDP server let socket_listen = socket.clone(); @@ -136,20 +130,21 @@ pub(crate) async fn handle_udp_listen( // Use timeout to periodically check cancellation match tokio::time::timeout( tokio::time::Duration::from_millis(100), - socket_listen.recv_from(&mut buf) - ).await { + socket_listen.recv_from(&mut buf), + ) + .await + { Ok(Ok((n, _addr))) => { - tracing::info!("udp_to_conn: Received {} bytes from server", n); - // Forward the buf back to the connection datagram - if let Err(e) = conn_clone.send_datagram(Bytes::copy_from_slice(&buf[..n])) { - eprintln!("Error on connection send_datagram: {}", e); + if let Err(e) = conn_clone.send_datagram(Bytes::copy_from_slice(&buf[..n])) + { + tracing::error!("Error on connection send_datagram: {}", e); token_udp.cancel(); break; } } Ok(Err(e)) => { - eprintln!("UDP receive error: {}", e); + tracing::error!("UDP receive error: {}", e); token_udp.cancel(); break; } From 0e258994c0306c07d504ddda6ac675e3422b1d95 Mon Sep 17 00:00:00 2001 From: unalkalkan Date: Sat, 22 Feb 2025 01:41:05 +0300 Subject: [PATCH 6/6] udpconn file renamed to udp. UDP proxy related codes removed from main.rs to udp.rs --- src/main.rs | 207 +-------------------------- src/udp.rs | 377 +++++++++++++++++++++++++++++++++++++++++++++++++ src/udpconn.rs | 173 ----------------------- 3 files changed, 382 insertions(+), 375 deletions(-) create mode 100644 src/udp.rs delete mode 100644 src/udpconn.rs diff --git a/src/main.rs b/src/main.rs index 9b9356d..28a4041 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,19 @@ //! Command line arguments. use anyhow::Context; -use bytes::Bytes; use clap::{Parser, Subcommand}; use dumbpipe::NodeTicket; use iroh::{ endpoint::{get_remote_node_id, Connecting}, Endpoint, NodeAddr, SecretKey, }; -use quinn::Connection; use std::{ - collections::HashMap, io, net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, str::FromStr, sync::Arc + io, net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}, str::FromStr }; use tokio::{ - io::{AsyncRead, AsyncWrite, AsyncWriteExt}, net::UdpSocket, select, signal + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, select }; use tokio_util::sync::CancellationToken; -mod udpconn; +mod udp; /// Create a dumb pipe between two machines, using an iroh magicsocket. /// @@ -482,113 +480,6 @@ async fn connect_tcp(args: ConnectTcpArgs) -> anyhow::Result<()> { Ok(()) } -// 1- Receives request message from socket -// 2- Forwards it to the connection datagram -// 3- Receives response message back from connection datagram -// 4- Forwards it back to the socket -async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { - let addrs = args - .addr - .to_socket_addrs() - .context(format!("invalid host string {}", args.addr))?; - let secret_key = get_or_create_secret()?; - let mut builder = Endpoint::builder().secret_key(secret_key).alpns(vec![]); - if let Some(addr) = args.common.magic_ipv4_addr { - builder = builder.bind_addr_v4(addr); - } - if let Some(addr) = args.common.magic_ipv6_addr { - builder = builder.bind_addr_v6(addr); - } - let endpoint = builder.bind().await.context("unable to bind magicsock")?; - tracing::info!("udp listening on {:?}", addrs); - let socket = Arc::new(UdpSocket::bind(addrs.as_slice()).await?); - - let node_addr = args.ticket.node_addr(); - let mut buf: Vec = vec![0u8; 65535]; - let conns = Arc::new(tokio::sync::Mutex::new( - HashMap::::new(), - )); - loop { - tokio::select! { - _ = signal::ctrl_c() => { - tracing::info!("Received CTRL-C, shutting down..."); - break; - } - result = socket.recv_from(&mut buf) => { - match result { - Ok((size, sock_addr)) => { - // Check if we already have a connection for this socket address - let mut cnns = conns.lock().await; - let connection = match cnns.get_mut(&sock_addr) { - Some(conn) => conn, - None => { - // If we don't have a connection, drop the previous lock to create a new one later on - drop(cnns); - - // Create a new connection since this address is not in the hashmap - let endpoint = endpoint.clone(); - let addr = node_addr.clone(); - let handshake = !args.common.is_custom_alpn(); - let alpn = args.common.alpn()?; - - let remote_node_id = addr.node_id; - tracing::info!("creating a connection to be forwarding UDP to {}", remote_node_id); - - // connect to the node, try only once - let connection = endpoint - .connect(addr.clone(), &alpn) - .await - .context(format!("error connecting to {}", remote_node_id))?; - tracing::info!("connected to {}", remote_node_id); - - // send the handshake unless we are using a custom alpn - if handshake { - connection.send_datagram(Bytes::from_static(&dumbpipe::HANDSHAKE))?; - } - - let sock_send = socket.clone(); - let conn_clone = connection.clone(); - let conns_clone = conns.clone(); - // Spawn a task for listening the connection datagram, and forward the data to the UDP socket - tokio::spawn(async move { - // 3- Receives response message back from connection datagram - // 4- Forwards it back to the socket - if let Err(cause) = udpconn::handle_udp_accept(sock_addr, sock_send, conn_clone).await { - // log error at warn level - // - // we should know about it, but it's not fatal - tracing::warn!("error handling connection: {}", cause); - } - // Cleanup resources for this connection since it's `Connection` is closed or errored out - let mut cn = conns_clone.lock().await; - cn.remove(&sock_addr); - }); - - // Store the connection and return - let mut cn = conns.lock().await; - cn.insert(sock_addr, connection.clone()); - &mut connection.clone() - } - }; - - // 1- Receives request message from socket - // 2- Forwards it to the connection datagram - if let Err(e) = connection.send_datagram(Bytes::copy_from_slice(&buf[..size])) { // Is Bytes::copy_from_slice most efficient way to do this?. Investigate. - tracing::error!("Error writing to connection datagram: {}", e); - return Err(e.into()); - } - } - Err(e) => { - tracing::warn!("error receiving from UDP socket: {}", e); - break; - } - } - } - } - } - Ok(()) -} - /// Listen on a magicsocket and forward incoming connections to a tcp socket. async fn listen_tcp(args: ListenTcpArgs) -> anyhow::Result<()> { let addrs = match args.host.to_socket_addrs() { @@ -682,94 +573,6 @@ async fn listen_tcp(args: ListenTcpArgs) -> anyhow::Result<()> { Ok(()) } -/// Listen on a magicsocket and forward incoming connections to a udp socket. -async fn listen_udp(args: ListenUdpArgs) -> anyhow::Result<()> { - let addrs = match args.host.to_socket_addrs() { - Ok(addrs) => addrs.collect::>(), - Err(e) => anyhow::bail!("invalid host string {}: {}", args.host, e), - }; - let secret_key = get_or_create_secret()?; - let mut builder = Endpoint::builder() - .alpns(vec![args.common.alpn()?]) - .secret_key(secret_key); - if let Some(addr) = args.common.magic_ipv4_addr { - builder = builder.bind_addr_v4(addr); - } - if let Some(addr) = args.common.magic_ipv6_addr { - builder = builder.bind_addr_v6(addr); - } - let endpoint = builder.bind().await?; - // wait for the endpoint to figure out its address before making a ticket - endpoint.home_relay().initialized().await?; - let node_addr = endpoint.node_addr().await?; - let mut short = node_addr.clone(); - let ticket = NodeTicket::new(node_addr); - short.direct_addresses.clear(); - let short = NodeTicket::new(short); - - // print the ticket on stderr so it doesn't interfere with the data itself - // - // note that the tests rely on the ticket being the last thing printed - eprintln!("Forwarding incoming requests to '{}'.", args.host); - eprintln!("To connect, use e.g.:"); - eprintln!("dumbpipe connect-udp {ticket}"); - if args.common.verbose > 0 { - eprintln!("or:\ndumbpipe connect-udp {}", short); - } - tracing::info!("node id is {}", ticket.node_addr().node_id); - tracing::info!("derp url is {:?}", ticket.node_addr().relay_url); - - // handle a new incoming connection on the magic endpoint - async fn handle_magic_accept( - connecting: Connecting, - addrs: Vec, - handshake: bool, - ) -> anyhow::Result<()> { - let connection = connecting.await.context("error accepting connection")?; - let remote_node_id = get_remote_node_id(&connection)?; - tracing::info!("got connection from {}", remote_node_id); - if handshake { - // read the handshake and verify it - let bytes = connection.read_datagram().await?; - anyhow::ensure!(*bytes == dumbpipe::HANDSHAKE, "invalid handshake"); - } - - // 1- Receives request message from quinn stream - // 2- Forwards it to the (addrs) via UDP socket - // 3- Receives response message back from UDP socket - // 4- Forwards it back to the quinn stream - udpconn::handle_udp_listen(addrs.as_slice(), connection).await?; - Ok(()) - } - - loop { - let incoming = select! { - incoming = endpoint.accept() => incoming, - _ = tokio::signal::ctrl_c() => { - eprintln!("got ctrl-c, exiting"); - break; - } - }; - let Some(incoming) = incoming else { - break; - }; - let Ok(connecting) = incoming.accept() else { - break; - }; - let addrs = addrs.clone(); - let handshake = !args.common.is_custom_alpn(); - tokio::spawn(async move { - if let Err(cause) = handle_magic_accept(connecting, addrs, handshake).await { - // log error at warn level - // - // we should know about it, but it's not fatal - tracing::warn!("error handling connection: {}", cause); - } - }); - } - Ok(()) -} - #[tokio::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); @@ -777,10 +580,10 @@ async fn main() -> anyhow::Result<()> { let res = match args.command { Commands::Listen(args) => listen_stdio(args).await, Commands::ListenTcp(args) => listen_tcp(args).await, - Commands::ListenUdp(args) => listen_udp(args).await, + Commands::ListenUdp(args) => udp::listen_udp(args).await, Commands::Connect(args) => connect_stdio(args).await, Commands::ConnectTcp(args) => connect_tcp(args).await, - Commands::ConnectUdp(args) => connect_udp(args).await, + Commands::ConnectUdp(args) => udp::connect_udp(args).await, }; match res { Ok(()) => std::process::exit(0), diff --git a/src/udp.rs b/src/udp.rs new file mode 100644 index 0000000..9c55072 --- /dev/null +++ b/src/udp.rs @@ -0,0 +1,377 @@ +use anyhow::{Context, Result}; +use bytes::Bytes; +use dumbpipe::NodeTicket; +use iroh::{ + endpoint::{get_remote_node_id, Connecting}, + Endpoint, +}; +use quinn::Connection; +use std::{ + collections::HashMap, + net::{SocketAddr, ToSocketAddrs}, +}; +use tokio::{net::UdpSocket, select, signal}; +use tokio_util::sync::CancellationToken; + +use std::sync::Arc; + +use crate::{get_or_create_secret, ConnectUdpArgs, ListenUdpArgs}; + +// 1- Receives request message from socket +// 2- Forwards it to the connection datagram +// 3- Receives response message back from connection datagram +// 4- Forwards it back to the socket +pub async fn connect_udp(args: ConnectUdpArgs) -> anyhow::Result<()> { + let addrs = args + .addr + .to_socket_addrs() + .context(format!("invalid host string {}", args.addr))?; + let secret_key = get_or_create_secret()?; + let mut builder = Endpoint::builder().secret_key(secret_key).alpns(vec![]); + if let Some(addr) = args.common.magic_ipv4_addr { + builder = builder.bind_addr_v4(addr); + } + if let Some(addr) = args.common.magic_ipv6_addr { + builder = builder.bind_addr_v6(addr); + } + let endpoint = builder.bind().await.context("unable to bind magicsock")?; + tracing::info!("udp listening on {:?}", addrs); + let socket = Arc::new(UdpSocket::bind(addrs.as_slice()).await?); + + let node_addr = args.ticket.node_addr(); + let mut buf: Vec = vec![0u8; 65535]; + let conns = Arc::new(tokio::sync::Mutex::new( + HashMap::::new(), + )); + loop { + tokio::select! { + _ = signal::ctrl_c() => { + eprintln!("Received CTRL-C, shutting down..."); + break; + } + result = socket.recv_from(&mut buf) => { + match result { + Ok((size, sock_addr)) => { + // Check if we already have a connection for this socket address + let mut cnns = conns.lock().await; + let connection = match cnns.get_mut(&sock_addr) { + Some(conn) => conn, + None => { + // If we don't have a connection, drop the previous lock to create a new one later on + drop(cnns); + + // Create a new connection since this address is not in the hashmap + let endpoint = endpoint.clone(); + let addr = node_addr.clone(); + let handshake = !args.common.is_custom_alpn(); + let alpn = args.common.alpn()?; + + let remote_node_id = addr.node_id; + tracing::info!("creating a connection to be forwarding UDP to {}", remote_node_id); + + // connect to the node, try only once + let connection = endpoint + .connect(addr.clone(), &alpn) + .await + .context(format!("error connecting to {}", remote_node_id))?; + tracing::info!("connected to {}", remote_node_id); + + // send the handshake unless we are using a custom alpn + if handshake { + connection.send_datagram(Bytes::from_static(&dumbpipe::HANDSHAKE))?; + } + + let sock_send = socket.clone(); + let conn_clone = connection.clone(); + let conns_clone = conns.clone(); + // Spawn a task for listening the connection datagram, and forward the data to the UDP socket + tokio::spawn(async move { + // 3- Receives response message back from connection datagram + // 4- Forwards it back to the socket + if let Err(cause) = handle_udp_accept(sock_addr, sock_send, conn_clone).await { + // log error at warn level + // + // we should know about it, but it's not fatal + tracing::warn!("error handling connection: {}", cause); + } + // Cleanup resources for this connection since it's `Connection` is closed or errored out + let mut cn = conns_clone.lock().await; + cn.remove(&sock_addr); + }); + + // Store the connection and return + let mut cn = conns.lock().await; + cn.insert(sock_addr, connection.clone()); + &mut connection.clone() + } + }; + + // 1- Receives request message from socket + // 2- Forwards it to the connection datagram + if let Err(e) = connection.send_datagram(Bytes::copy_from_slice(&buf[..size])) { // Is Bytes::copy_from_slice most efficient way to do this?. Investigate. + tracing::error!("Error writing to connection datagram: {}", e); + return Err(e.into()); + } + } + Err(e) => { + tracing::warn!("error receiving from UDP socket: {}", e); + break; + } + } + } + } + } + Ok(()) +} + +/// Listen on a magicsocket and forward incoming connections to a udp socket. +pub async fn listen_udp(args: ListenUdpArgs) -> anyhow::Result<()> { + let addrs = match args.host.to_socket_addrs() { + Ok(addrs) => addrs.collect::>(), + Err(e) => anyhow::bail!("invalid host string {}: {}", args.host, e), + }; + let secret_key = get_or_create_secret()?; + let mut builder = Endpoint::builder() + .alpns(vec![args.common.alpn()?]) + .secret_key(secret_key); + if let Some(addr) = args.common.magic_ipv4_addr { + builder = builder.bind_addr_v4(addr); + } + if let Some(addr) = args.common.magic_ipv6_addr { + builder = builder.bind_addr_v6(addr); + } + let endpoint = builder.bind().await?; + // wait for the endpoint to figure out its address before making a ticket + endpoint.home_relay().initialized().await?; + let node_addr = endpoint.node_addr().await?; + let mut short = node_addr.clone(); + let ticket = NodeTicket::new(node_addr); + short.direct_addresses.clear(); + let short = NodeTicket::new(short); + + // print the ticket on stderr so it doesn't interfere with the data itself + // + // note that the tests rely on the ticket being the last thing printed + eprintln!("Forwarding incoming requests to '{}'.", args.host); + eprintln!("To connect, use e.g.:"); + eprintln!("dumbpipe connect-udp {ticket}"); + if args.common.verbose > 0 { + eprintln!("or:\ndumbpipe connect-udp {}", short); + } + tracing::info!("node id is {}", ticket.node_addr().node_id); + tracing::info!("derp url is {:?}", ticket.node_addr().relay_url); + + // handle a new incoming connection on the magic endpoint + async fn handle_magic_accept( + connecting: Connecting, + addrs: Vec, + handshake: bool, + ) -> anyhow::Result<()> { + let connection = connecting.await.context("error accepting connection")?; + let remote_node_id = get_remote_node_id(&connection)?; + tracing::info!("got connection from {}", remote_node_id); + if handshake { + // read the handshake and verify it + let bytes = connection.read_datagram().await?; + anyhow::ensure!(*bytes == dumbpipe::HANDSHAKE, "invalid handshake"); + } + + // 1- Receives request message from connection datagram + // 2- Forwards it to the (addrs) via UDP socket + // 3- Receives response message back from UDP socket + // 4- Forwards it back to the connection datagram + handle_udp_listen(addrs.as_slice(), connection).await?; + Ok(()) + } + + loop { + let incoming = select! { + incoming = endpoint.accept() => incoming, + _ = tokio::signal::ctrl_c() => { + eprintln!("got ctrl-c, exiting"); + break; + } + }; + let Some(incoming) = incoming else { + break; + }; + let Ok(connecting) = incoming.accept() else { + break; + }; + let addrs = addrs.clone(); + let handshake = !args.common.is_custom_alpn(); + tokio::spawn(async move { + if let Err(cause) = handle_magic_accept(connecting, addrs, handshake).await { + // log error at warn level + // + // we should know about it, but it's not fatal + tracing::warn!("error handling connection: {}", cause); + } + }); + } + Ok(()) +} + +async fn handle_udp_accept( + client_addr: SocketAddr, + udp_socket: Arc, + connection: Connection, +) -> Result<()> { + // Create a cancellation token to coordinate shutdown + let token = CancellationToken::new(); + let token_conn = token.clone(); + let token_ctrl_c = token.clone(); + + // Create buffer for receiving data + let connection_to_udp = { + let socket = udp_socket.clone(); + tokio::spawn(async move { + loop { + // Check if we should stop + if token_conn.is_cancelled() { + break; + } + + // Read from connection datagram + match connection.read_datagram().await { + Ok(bytes) => { + // Forward to UDP peer + if let Err(e) = socket.send_to(&bytes, client_addr).await { + tracing::error!("Error sending to UDP: {}", e); + token_conn.cancel(); + break; + } + } + Err(e) => { + tracing::error!("Connection read_datagram error: {}", e); + token_conn.cancel(); + break; + } + } + } + }) + }; + + // Handle Ctrl+C signal + let ctrl_c = tokio::spawn(async move { + if let Ok(()) = tokio::signal::ctrl_c().await { + token_ctrl_c.cancel(); + } + }); + + // Wait for any task to complete (or Ctrl+C) + tokio::select! { + _ = connection_to_udp => {}, + _ = ctrl_c => {}, + } + + Ok(()) +} + +// Every new connection is a new socket to the `connect udp` command +async fn handle_udp_listen(peer_addrs: &[SocketAddr], connection: Connection) -> Result<()> { + // Create a cancellation token to coordinate shutdown + let token = CancellationToken::new(); + let token_udp = token.clone(); + let token_conn = token.clone(); + let token_ctrl_c = token.clone(); + + // Create a new socket for this connection, representing the client connected to UDP server at the other side. + // This socket will be used to send data to the actual server, receive response back and forward it to the conn. + let socket = Arc::new(UdpSocket::bind("0.0.0:0").await?); + + let udp_buf_size = 65535; // Maximum UDP packet size + let conn_to_udp = { + let socket_send = socket.clone(); + let p_addr = peer_addrs.to_vec(); + let conn_clone = connection.clone(); + tokio::spawn(async move { + loop { + // Check if we should stop + if token_conn.is_cancelled() { + tracing::info!("Token cancellation was requested. Ending QUIC to UDP task."); + break; + } + + // Read from connection datagram + match conn_clone.read_datagram().await { + Ok(bytes) => { + // Forward to UDP peer + for addr in p_addr.iter() { + if let Err(e) = socket_send.send_to(&bytes, addr).await { + tracing::error!("Error sending to UDP: {}", e); + token_conn.cancel(); + break; + } + } + } + Err(e) => { + tracing::error!("Connection read_datagram error: {}", e); + token_conn.cancel(); + break; + } + } + } + tracing::info!("Token cancellation was requested or error received. connection datagram task ended."); + }) + }; + + let udp_to_conn = { + // Task for listening to the response to the UDP server + let socket_listen = socket.clone(); + let conn_clone = connection.clone(); + tokio::spawn(async move { + let mut buf = vec![0u8; udp_buf_size]; + loop { + // Check if we should stop + if token_udp.is_cancelled() { + tracing::info!("Token cancellation was requested. Ending UDP to QUIC task."); + break; + } + + // Use timeout to periodically check cancellation + match tokio::time::timeout( + tokio::time::Duration::from_millis(100), + socket_listen.recv_from(&mut buf), + ) + .await + { + Ok(Ok((n, _addr))) => { + // Forward the buf back to the connection datagram + if let Err(e) = conn_clone.send_datagram(Bytes::copy_from_slice(&buf[..n])) + { + tracing::error!("Error on connection send_datagram: {}", e); + token_udp.cancel(); + break; + } + } + Ok(Err(e)) => { + tracing::error!("UDP receive error: {}", e); + token_udp.cancel(); + break; + } + Err(_) => continue, // Timeout, check cancellation + } + } + tracing::info!( + "Token cancellation was requested or error received. UDP socket task ended." + ); + }) + }; + + // Handle Ctrl+C signal + let ctrl_c = tokio::spawn(async move { + if let Ok(()) = tokio::signal::ctrl_c().await { + token_ctrl_c.cancel(); + } + }); + + // Wait for any task to complete (or Ctrl+C) + tokio::select! { + _ = conn_to_udp => {}, + _ = udp_to_conn => {}, + _ = ctrl_c => {}, + } + + Ok(()) +} diff --git a/src/udpconn.rs b/src/udpconn.rs deleted file mode 100644 index 7479d30..0000000 --- a/src/udpconn.rs +++ /dev/null @@ -1,173 +0,0 @@ -use anyhow::Result; -use bytes::Bytes; -use quinn::Connection; -use std::net::SocketAddr; -use tokio::net::UdpSocket; -use tokio_util::sync::CancellationToken; - -use std::sync::Arc; - -pub(crate) async fn handle_udp_accept( - client_addr: SocketAddr, - udp_socket: Arc, - connection: Connection, -) -> Result<()> { - // Create a cancellation token to coordinate shutdown - let token = CancellationToken::new(); - let token_conn = token.clone(); - let token_ctrl_c = token.clone(); - - // Create buffer for receiving data - let connection_to_udp = { - let socket = udp_socket.clone(); - tokio::spawn(async move { - loop { - // Check if we should stop - if token_conn.is_cancelled() { - break; - } - - // Read from connection datagram - match connection.read_datagram().await { - Ok(bytes) => { - // Forward to UDP peer - if let Err(e) = socket.send_to(&bytes, client_addr).await { - tracing::error!("Error sending to UDP: {}", e); - token_conn.cancel(); - break; - } - } - Err(e) => { - tracing::error!("Connection read_datagram error: {}", e); - token_conn.cancel(); - break; - } - } - } - }) - }; - - // Handle Ctrl+C signal - let ctrl_c = tokio::spawn(async move { - if let Ok(()) = tokio::signal::ctrl_c().await { - token_ctrl_c.cancel(); - } - }); - - // Wait for any task to complete (or Ctrl+C) - tokio::select! { - _ = connection_to_udp => {}, - _ = ctrl_c => {}, - } - - Ok(()) -} - -// Every new connection is a new socket to the `connect udp` command -pub(crate) async fn handle_udp_listen( - peer_addrs: &[SocketAddr], - connection: Connection, -) -> Result<()> { - // Create a cancellation token to coordinate shutdown - let token = CancellationToken::new(); - let token_udp = token.clone(); - let token_conn = token.clone(); - let token_ctrl_c = token.clone(); - - // Create a new socket for this connection, representing the client connected to UDP server at the other side. - // This socket will be used to send data to the actual server, receive response back and forward it to the conn. - let socket = Arc::new(UdpSocket::bind("0.0.0:0").await?); - - let udp_buf_size = 65535; // Maximum UDP packet size - let conn_to_udp = { - let socket_send = socket.clone(); - let p_addr = peer_addrs.to_vec(); - let conn_clone = connection.clone(); - tokio::spawn(async move { - loop { - // Check if we should stop - if token_conn.is_cancelled() { - tracing::info!("Token cancellation was requested. Ending QUIC to UDP task."); - break; - } - - // Read from connection datagram - match conn_clone.read_datagram().await { - Ok(bytes) => { - // Forward to UDP peer - for addr in p_addr.iter() { - if let Err(e) = socket_send.send_to(&bytes, addr).await { - tracing::error!("Error sending to UDP: {}", e); - token_conn.cancel(); - break; - } - } - } - Err(e) => { - tracing::error!("Connection read_datagram error: {}", e); - token_conn.cancel(); - break; - } - } - } - tracing::info!("Token cancellation was requested or error received. connection datagram task ended."); - }) - }; - - let udp_to_conn = { - // Task for listening to the response to the UDP server - let socket_listen = socket.clone(); - let conn_clone = connection.clone(); - tokio::spawn(async move { - let mut buf = vec![0u8; udp_buf_size]; - loop { - // Check if we should stop - if token_udp.is_cancelled() { - tracing::info!("Token cancellation was requested. Ending UDP to QUIC task."); - break; - } - - // Use timeout to periodically check cancellation - match tokio::time::timeout( - tokio::time::Duration::from_millis(100), - socket_listen.recv_from(&mut buf), - ) - .await - { - Ok(Ok((n, _addr))) => { - // Forward the buf back to the connection datagram - if let Err(e) = conn_clone.send_datagram(Bytes::copy_from_slice(&buf[..n])) - { - tracing::error!("Error on connection send_datagram: {}", e); - token_udp.cancel(); - break; - } - } - Ok(Err(e)) => { - tracing::error!("UDP receive error: {}", e); - token_udp.cancel(); - break; - } - Err(_) => continue, // Timeout, check cancellation - } - } - tracing::info!("Token cancellation was requested or error received. UDP socket task ended."); - }) - }; - - // Handle Ctrl+C signal - let ctrl_c = tokio::spawn(async move { - if let Ok(()) = tokio::signal::ctrl_c().await { - token_ctrl_c.cancel(); - } - }); - - // Wait for any task to complete (or Ctrl+C) - tokio::select! { - _ = conn_to_udp => {}, - _ = udp_to_conn => {}, - _ = ctrl_c => {}, - } - - Ok(()) -}