diff --git a/Cargo.toml b/Cargo.toml index 477f459..7b6f5b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,8 @@ edition = "2021" license = "MIT" resolver = "2" description = "Tor transport for libp2p." -repository = "https://github.com/umgefahren/libp2p-tor" -authors = ["umgefahren "] +repository = "https://github.com/UnstoppableSwap/libp2p-tor" +authors = ["umgefahren ", "наб "] [dependencies] thiserror = "1.0" @@ -14,14 +14,15 @@ anyhow = "1.0.93" tokio = "1.41.1" futures = "0.3" -arti-client = { version = "0.30", default-features = false, features = ["tokio", "rustls", "onion-service-client", "static-sqlite"] } +arti-client = { version = "^0.25", default-features = false, features = ["tokio", "rustls", "onion-service-client", "static-sqlite"] } +tor-interface = { git = "https://github.com/nabijaczleweli/gosling", rev = "32988e5770c12f1b48b865c158509473123eae90", optional = true } libp2p = { version = "^0.53", default-features = false, features = ["tokio", "tcp", "tls"] } -tor-rtcompat = { version = "0.30", features = ["tokio", "rustls"] } +tor-rtcompat = { version = "^0.25", features = ["tokio", "rustls"] } tracing = "0.1.40" -tor-hsservice = { version = "0.30", optional = true } -tor-cell = { version = "0.30", optional = true } -tor-proto = { version = "0.30", optional = true } +tor-hsservice = { version = "^0.25", optional = true } +tor-cell = { version = "^0.25", optional = true } +tor-proto = { version = "^0.25", optional = true } data-encoding = { version = "2.6.0" } [dev-dependencies] @@ -37,11 +38,18 @@ listen-onion-service = [ "dep:tor-cell", "dep:tor-proto" ] +arti-client-tor-provider = ["tor-interface", "tor-interface/arti-client-tor-provider"] +legacy-tor-provider = ["tor-interface", "tor-interface/legacy-tor-provider"] +mock-tor-provider = ["tor-interface", "tor-interface/mock-tor-provider"] [[example]] name = "ping-onion" required-features = ["listen-onion-service"] +[[example]] +name = "ping-onion-tor-interface" +required-features = ["tor-interface"] + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/README.md b/README.md index 559f4d6..1c4ef11 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,13 @@ let mut transport = libp2p_community_tor::TorTransport::bootstrapped().await?; // we have achieved tor connection let _conn = transport.dial(address)?.await?; ``` +```rust +let address = "/dns/www.torproject.org/tcp/1000".parse()?; +let mut provider = libp2p_community_tor_interface::tor_interface::/* whichever one you want */; +let mut transport = libp2p_community_tor_interface::TorInterfaceTransport::from_provider(Default::default(), Arc::new(Mutex::new(provider)), None); +// we have achieved tor connection +let _conn = transport.dial(address)?.await?; +``` ### About diff --git a/examples/ping-onion-tor-interface.rs b/examples/ping-onion-tor-interface.rs new file mode 100644 index 0000000..b5710f1 --- /dev/null +++ b/examples/ping-onion-tor-interface.rs @@ -0,0 +1,146 @@ +// Copyright 2022 Hannes Furmans +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Ping-Onion example +//! +//! See ../src/tutorial.rs for a step-by-step guide building the example below. +//! +//! This example requires two seperate computers, one of which has to be reachable from the +//! internet. +//! +//! On the first computer run: +//! ```sh +//! cargo run --example ping +//! ``` +//! +//! It will print the PeerId and the listening addresses, e.g. `Listening on +//! "/ip4/0.0.0.0/tcp/24915"` +//! +//! Make sure that the first computer is reachable under one of these ip addresses and port. +//! +//! On the second computer run: +//! ```sh +//! cargo run --example ping-onion -- /ip4/123.45.67.89/tcp/24915 +//! ``` +//! +//! The two nodes establish a connection, negotiate the ping protocol +//! and begin pinging each other over Tor. + +use futures::StreamExt; +use libp2p::core::upgrade::Version; +use libp2p::Transport; +use libp2p::{ + core::muxing::StreamMuxerBox, + identity, noise, + swarm::{NetworkBehaviour, SwarmEvent}, + yamux, Multiaddr, PeerId, SwarmBuilder, +}; +use std::error::Error; +use std::sync::{Arc, Mutex}; +use tor_interface::tor_crypto::Ed25519PrivateKey; + +/// Create a transport +/// Returns a tuple of the transport and the onion address we can instruct it to listen on +async fn onion_transport( + keypair: identity::Keypair, +) -> Result< + ( + libp2p::core::transport::Boxed<(PeerId, libp2p::core::muxing::StreamMuxerBox)>, + Multiaddr, + ), + Box, +> { + let provider = libp2p_community_tor::tor_interface::legacy_tor_client::LegacyTorClient::new( + libp2p_community_tor::tor_interface::legacy_tor_client::LegacyTorClientConfig::system_from_environment().expect("Configure $TOR_... to talk to"))?; + + let mut transport = libp2p_community_tor::TorInterfaceTransport::from_provider( + libp2p_community_tor::AddressConversion::IpAndDns, Arc::new(Mutex::new(provider)), None)?; + + let onion_listen_address = transport.add_onion_service(&Ed25519PrivateKey::generate(), 999, None, None).unwrap(); + + let auth_upgrade = noise::Config::new(&keypair)?; + let multiplex_upgrade = yamux::Config::default(); + + let transport = transport + .boxed() + .upgrade(Version::V1) + .authenticate(auth_upgrade) + .multiplex(multiplex_upgrade) + .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) + .boxed(); + + Ok((transport, onion_listen_address)) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + + println!("Local peer id: {local_peer_id}"); + + let (transport, onion_listen_address) = onion_transport(local_key).await?; + + let mut swarm = SwarmBuilder::with_new_identity() + .with_tokio() + .with_other_transport(|_| transport) + .unwrap() + .with_behaviour(|_| Behaviour { + ping: libp2p::ping::Behaviour::default(), + }) + .unwrap() + .build(); + + // Dial the peer identified by the multi-address given as the second + // command-line argument, if any. + if let Some(addr) = std::env::args().nth(1) { + let remote: Multiaddr = addr.parse()?; + swarm.dial(remote)?; + println!("Dialed {addr}") + } else { + // If we are not dialing, we need to listen + // Tell the swarm to listen on a specific onion address + swarm.listen_on(onion_listen_address).unwrap(); + } + + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { + endpoint, peer_id, .. + } => { + println!("Connection established with {peer_id} on {endpoint:?}"); + } + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + println!("Outgoing connection error with {peer_id:?}: {error:?}"); + } + SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"), + SwarmEvent::Behaviour(event) => println!("{event:?}"), + _ => {} + } + } +} + +/// Our network behaviour. +#[derive(NetworkBehaviour)] +struct Behaviour { + ping: libp2p::ping::Behaviour, +} diff --git a/src/address.rs b/src/arti/address.rs similarity index 100% rename from src/address.rs rename to src/arti/address.rs diff --git a/src/arti/mod.rs b/src/arti/mod.rs new file mode 100644 index 0000000..098de2d --- /dev/null +++ b/src/arti/mod.rs @@ -0,0 +1,413 @@ +use arti_client::{TorClient, TorClientBuilder}; +use futures::future::BoxFuture; +use libp2p::{ + core::transport::{ListenerId, TransportEvent}, + Multiaddr, Transport, TransportError, +}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use thiserror::Error; +use tor_rtcompat::tokio::TokioRustlsRuntime; + +// We only need these imports if the `listen-onion-service` feature is enabled +#[cfg(feature = "listen-onion-service")] +use std::collections::HashMap; +#[cfg(feature = "listen-onion-service")] +use std::str::FromStr; +#[cfg(feature = "listen-onion-service")] +use tor_cell::relaycell::msg::{Connected, End, EndReason}; +#[cfg(feature = "listen-onion-service")] +use tor_hsservice::{ + handle_rend_requests, status::OnionServiceStatus, HsId, OnionServiceConfig, + RunningOnionService, StreamRequest, +}; +#[cfg(feature = "listen-onion-service")] +use tor_proto::stream::IncomingStreamRequest; + +mod address; +mod provider; + +use address::{dangerous_extract, safe_extract}; +pub use provider::TokioTorStream; + +pub type TorError = arti_client::Error; + +type PendingUpgrade = BoxFuture<'static, Result>; +#[cfg(feature = "listen-onion-service")] +type OnionServiceStream = futures::stream::BoxStream<'static, StreamRequest>; +#[cfg(feature = "listen-onion-service")] +type OnionServiceStatusStream = futures::stream::BoxStream<'static, OnionServiceStatus>; + +/// Struct representing an onion address we are listening on for libp2p connections. +#[cfg(feature = "listen-onion-service")] +struct TorListener { + #[allow(dead_code)] // We need to own this to keep the RunningOnionService alive + /// The onion service we are listening on + service: Arc, + /// The stream of status updates for the onion service + status_stream: OnionServiceStatusStream, + /// The stream incoming [`StreamRequest`]s + request_stream: OnionServiceStream, + + /// The port we are listening on + port: u16, + /// The onion address we are listening on + onion_address: Multiaddr, + /// Whether we have already announced this address + announced: bool, +} + +/// Mode of address conversion. +/// Refer tor [arti_client::TorAddr](https://docs.rs/arti-client/latest/arti_client/struct.TorAddr.html) for details +#[derive(Debug, Clone, Copy, Hash, Default, PartialEq, Eq, PartialOrd, Ord)] +pub enum AddressConversion { + /// Uses only DNS for address resolution (default). + #[default] + DnsOnly, + /// Uses IP and DNS for addresses. + IpAndDns, +} + +pub struct TorTransport { + pub conversion_mode: AddressConversion, + + /// The Tor client. + client: Arc>, + + /// Onion services we are listening on. + #[cfg(feature = "listen-onion-service")] + listeners: HashMap, + + /// Onion services we are running but currently not listening on + #[cfg(feature = "listen-onion-service")] + services: Vec<(Arc, OnionServiceStream)>, +} + +impl TorTransport { + /// Creates a new `TorClientBuilder`. + /// + /// # Panics + /// Panics if the current runtime is not a `TokioRustlsRuntime`. + pub fn builder() -> TorClientBuilder { + let runtime = + TokioRustlsRuntime::current().expect("Couldn't get the current tokio rustls runtime"); + TorClient::with_runtime(runtime) + } + + /// Creates a bootstrapped `TorTransport` + /// + /// # Errors + /// Could return error emitted during Tor bootstrap by Arti. + pub async fn bootstrapped() -> Result { + let builder = Self::builder(); + let ret = Self::from_builder(&builder, AddressConversion::DnsOnly)?; + ret.bootstrap().await?; + Ok(ret) + } + + /// Builds a `TorTransport` from an Arti `TorClientBuilder` but does not bootstrap it. + /// + /// # Errors + /// Could return error emitted during creation of the `TorClient`. + pub fn from_builder( + builder: &TorClientBuilder, + conversion_mode: AddressConversion, + ) -> Result { + let client = Arc::new(builder.create_unbootstrapped()?); + + Ok(Self::from_client(client, conversion_mode)) + } + + /// Builds a `TorTransport` from an existing Arti `TorClient`. + pub fn from_client( + client: Arc>, + conversion_mode: AddressConversion, + ) -> Self { + Self { + conversion_mode, + client, + #[cfg(feature = "listen-onion-service")] + listeners: HashMap::new(), + #[cfg(feature = "listen-onion-service")] + services: Vec::new(), + } + } + + /// Bootstraps the `TorTransport` into the Tor network. + /// + /// # Errors + /// Could return error emitted during bootstrap by Arti. + pub async fn bootstrap(&self) -> Result<(), TorError> { + self.client.bootstrap().await + } + + /// Set the address conversion mode + #[must_use] + pub fn with_address_conversion(mut self, conversion_mode: AddressConversion) -> Self { + self.conversion_mode = conversion_mode; + self + } + + /// Call this function to instruct the transport to listen on a specific onion address + /// You need to call this function **before** calling `listen_on` + /// + /// # Returns + /// Returns the Multiaddr of the onion address that the transport can be instructed to listen on + /// To actually listen on the address, you need to call [`listen_on`] with the returned address + /// + /// # Errors + /// Returns an error if we cannot get the onion address of the service + #[cfg(feature = "listen-onion-service")] + pub fn add_onion_service( + &mut self, + svc_cfg: OnionServiceConfig, + port: u16, + ) -> anyhow::Result { + let (service, request_stream) = self.client.launch_onion_service(svc_cfg)?; + let request_stream = Box::pin(handle_rend_requests(request_stream)); + + let multiaddr = service + .onion_name() + .ok_or_else(|| anyhow::anyhow!("Onion service has no onion address"))? + .to_multiaddr(port); + + self.services.push((service, request_stream)); + + Ok(multiaddr) + } +} + +#[derive(Debug, Error)] +pub enum TorTransportError { + #[error(transparent)] + Client(#[from] TorError), + #[cfg(feature = "listen-onion-service")] + #[error(transparent)] + Service(#[from] tor_hsservice::ClientError), + #[cfg(feature = "listen-onion-service")] + #[error("Stream closed before receiving data")] + StreamClosed, + #[cfg(feature = "listen-onion-service")] + #[error("Stream port does not match listener port")] + StreamPortMismatch, + #[cfg(feature = "listen-onion-service")] + #[error("Onion service is broken")] + Broken, +} + +#[cfg(feature = "listen-onion-service")] +trait HsIdExt { + fn to_multiaddr(&self, port: u16) -> Multiaddr; +} + +#[cfg(feature = "listen-onion-service")] +impl HsIdExt for HsId { + /// Convert an `HsId` to a `Multiaddr` + fn to_multiaddr(&self, port: u16) -> Multiaddr { + let onion_domain = self.to_string(); + let onion_without_dot_onion = onion_domain + .split('.') + .nth(0) + .expect("Display formatting of HsId to contain .onion suffix"); + let multiaddress_string = format!("/onion3/{onion_without_dot_onion}:{port}"); + + Multiaddr::from_str(&multiaddress_string) + .expect("A valid onion address to be convertible to a Multiaddr") + } +} + +impl Transport for TorTransport { + type Output = TokioTorStream; + type Error = TorTransportError; + type Dial = BoxFuture<'static, Result>; + type ListenerUpgrade = PendingUpgrade; + + #[cfg(not(feature = "listen-onion-service"))] + fn listen_on( + &mut self, + _id: ListenerId, + onion_address: Multiaddr, + ) -> Result<(), TransportError> { + // If the `listen-onion-service` feature is not enabled, we do not support listening + Err(TransportError::MultiaddrNotSupported(onion_address.clone())) + } + + #[cfg(feature = "listen-onion-service")] + fn listen_on( + &mut self, + id: ListenerId, + onion_address: Multiaddr, + ) -> Result<(), TransportError> { + // If the address is not an onion3 address, return an error + let Some(libp2p::multiaddr::Protocol::Onion3(address)) = onion_address.into_iter().nth(0) + else { + return Err(TransportError::MultiaddrNotSupported(onion_address.clone())); + }; + + // Find the running onion service that matches the requested address + // If we find it, remove it from [`services`] and insert it into [`listeners`] + let position = self + .services + .iter() + .position(|(service, _)| { + service.onion_name().map_or(false, |name| { + name.to_multiaddr(address.port()) == onion_address + }) + }) + .ok_or_else(|| TransportError::MultiaddrNotSupported(onion_address.clone()))?; + + let (service, request_stream) = self.services.remove(position); + + let status_stream = Box::pin(service.status_events()); + + self.listeners.insert( + id, + TorListener { + service, + request_stream, + onion_address: onion_address.clone(), + port: address.port(), + status_stream, + announced: false, + }, + ); + + Ok(()) + } + + // We do not support removing listeners if the `listen-onion-service` feature is not enabled + #[cfg(not(feature = "listen-onion-service"))] + fn remove_listener(&mut self, _id: ListenerId) -> bool { + false + } + + #[cfg(feature = "listen-onion-service")] + fn remove_listener(&mut self, id: ListenerId) -> bool { + // Take the listener out of the map. This will stop listening on onion service for libp2p connections (we will not poll it anymore) + // However, we will not stop the onion service itself because we might want to reuse it later + // The onion service will be stopped when the transport is dropped + if let Some(listener) = self.listeners.remove(&id) { + self.services + .push((listener.service, listener.request_stream)); + return true; + } + + false + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let maybe_tor_addr = match self.conversion_mode { + AddressConversion::DnsOnly => safe_extract(&addr), + AddressConversion::IpAndDns => dangerous_extract(&addr), + }; + + let tor_address = + maybe_tor_addr.ok_or(TransportError::MultiaddrNotSupported(addr.clone()))?; + let onion_client = self.client.clone(); + + Ok(Box::pin(async move { + let stream = onion_client.connect(tor_address).await?; + + tracing::debug!(%addr, "Established connection to peer through Tor"); + + Ok(TokioTorStream::from(stream)) + })) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.dial(addr) + } + + fn address_translation(&self, _listen: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } + + #[cfg(not(feature = "listen-onion-service"))] + fn poll( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + // If the `listen-onion-service` feature is not enabled, we do not support listening + Poll::Pending + } + + #[cfg(feature = "listen-onion-service")] + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + for (listener_id, listener) in &mut self.listeners { + // Check if the service has any new statuses + if let Poll::Ready(Some(status)) = listener.status_stream.as_mut().poll_next(cx) { + tracing::debug!( + status = ?status.state(), + address = listener.onion_address.to_string(), + "Onion service status changed" + ); + } + + // Check if we have already announced this address, if not, do it now + if !listener.announced { + listener.announced = true; + + // We announce the address here to the swarm even though we technically cannot guarantee + // that the address is reachable yet from the outside. We might not have registered the + // onion service fully yet (introduction points, hsdir, ...) + // + // However, we need to announce it now because otherwise libp2p might not poll the listener + // again and we will not be able to announce it later. + // TODO: Find out why this is the case, if this is intended behaviour or a bug + return Poll::Ready(TransportEvent::NewAddress { + listener_id: *listener_id, + listen_addr: listener.onion_address.clone(), + }); + } + + match listener.request_stream.as_mut().poll_next(cx) { + Poll::Ready(Some(request)) => { + let port = listener.port; + let upgrade: PendingUpgrade = Box::pin(async move { + // Check if the port matches what we expect + if let IncomingStreamRequest::Begin(begin) = request.request() { + if begin.port() != port { + // Reject the connection with CONNECTREFUSED + request + .reject(End::new_with_reason(EndReason::CONNECTREFUSED)) + .await?; + + return Err(TorTransportError::StreamPortMismatch); + } + } + + // Accept the stream and forward it to the swarm + let data_stream = request.accept(Connected::new_empty()).await?; + Ok(TokioTorStream::from(data_stream)) + }); + + return Poll::Ready(TransportEvent::Incoming { + listener_id: *listener_id, + upgrade, + local_addr: listener.onion_address.clone(), + send_back_addr: listener.onion_address.clone(), + }); + } + + // The stream has ended + // This means that the onion service was shut down, and we will not receive any more connections on it + Poll::Ready(None) => { + return Poll::Ready(TransportEvent::ListenerClosed { + listener_id: *listener_id, + reason: Ok(()), + }); + } + Poll::Pending => {} + } + } + + Poll::Pending + } +} diff --git a/src/provider.rs b/src/arti/provider.rs similarity index 100% rename from src/provider.rs rename to src/arti/provider.rs diff --git a/src/lib.rs b/src/lib.rs index 70b1799..585865c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,7 +38,7 @@ //! This crate uses tokio with rustls for its runtime and TLS implementation. //! No other combinations are supported. //! -//! ## Example +//! ## Examples //! ```no_run //! use libp2p::core::Transport; //! # async fn test_func() -> Result<(), Box> { @@ -50,417 +50,28 @@ //! # } //! # tokio_test::block_on(test_func()); //! ``` +//! +//! ```no_run +//! use libp2p::core::Transport; +//! use std::sync::{Arc, Mutex}; +//! use libp2p_community_tor::tor_interface::tor_provider::TorProvider; +//! # async fn test_func() -> Result<(), Box> { +//! let address = "/dns/www.torproject.org/tcp/1000".parse()?; +//! let mut provider = libp2p_community_tor::tor_interface::legacy_tor_client::LegacyTorClient::new( +//! libp2p_community_tor::tor_interface::legacy_tor_client::LegacyTorClientConfig::system_from_environment().unwrap())?; +//! provider.bootstrap()?; +//! let mut transport = libp2p_community_tor::TorInterfaceTransport::from_provider(Default::default(), Arc::new(Mutex::new(provider)), None)?; +//! // we have achieved tor connection +//! let _conn = transport.dial(address)?.await?; +//! # Ok(()) +//! # } +//! # tokio_test::block_on(test_func()); +//! ``` -use arti_client::{TorClient, TorClientBuilder}; -use futures::future::BoxFuture; -use libp2p::{ - core::transport::{ListenerId, TransportEvent}, - Multiaddr, Transport, TransportError, -}; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; -use thiserror::Error; -use tor_rtcompat::tokio::TokioRustlsRuntime; - -// We only need these imports if the `listen-onion-service` feature is enabled -#[cfg(feature = "listen-onion-service")] -use std::collections::HashMap; -#[cfg(feature = "listen-onion-service")] -use std::str::FromStr; -#[cfg(feature = "listen-onion-service")] -use tor_cell::relaycell::msg::{Connected, End, EndReason}; -#[cfg(feature = "listen-onion-service")] -use tor_hsservice::{ - handle_rend_requests, status::OnionServiceStatus, HsId, OnionServiceConfig, - RunningOnionService, StreamRequest, -}; -#[cfg(feature = "listen-onion-service")] -use tor_proto::stream::IncomingStreamRequest; - -mod address; -mod provider; - -use address::{dangerous_extract, safe_extract}; -pub use provider::TokioTorStream; - -pub type TorError = arti_client::Error; - -type PendingUpgrade = BoxFuture<'static, Result>; -#[cfg(feature = "listen-onion-service")] -type OnionServiceStream = futures::stream::BoxStream<'static, StreamRequest>; -#[cfg(feature = "listen-onion-service")] -type OnionServiceStatusStream = futures::stream::BoxStream<'static, OnionServiceStatus>; - -/// Struct representing an onion address we are listening on for libp2p connections. -#[cfg(feature = "listen-onion-service")] -struct TorListener { - #[allow(dead_code)] // We need to own this to keep the RunningOnionService alive - /// The onion service we are listening on - service: Arc, - /// The stream of status updates for the onion service - status_stream: OnionServiceStatusStream, - /// The stream incoming [`StreamRequest`]s - request_stream: OnionServiceStream, - - /// The port we are listening on - port: u16, - /// The onion address we are listening on - onion_address: Multiaddr, - /// Whether we have already announced this address - announced: bool, -} - -/// Mode of address conversion. -/// Refer tor [arti_client::TorAddr](https://docs.rs/arti-client/latest/arti_client/struct.TorAddr.html) for details -#[derive(Debug, Clone, Copy, Hash, Default, PartialEq, Eq, PartialOrd, Ord)] -pub enum AddressConversion { - /// Uses only DNS for address resolution (default). - #[default] - DnsOnly, - /// Uses IP and DNS for addresses. - IpAndDns, -} - -pub struct TorTransport { - pub conversion_mode: AddressConversion, - - /// The Tor client. - client: Arc>, - - /// Onion services we are listening on. - #[cfg(feature = "listen-onion-service")] - listeners: HashMap, - - /// Onion services we are running but currently not listening on - #[cfg(feature = "listen-onion-service")] - services: Vec<(Arc, OnionServiceStream)>, -} - -impl TorTransport { - /// Creates a new `TorClientBuilder`. - /// - /// # Panics - /// Panics if the current runtime is not a `TokioRustlsRuntime`. - pub fn builder() -> TorClientBuilder { - let runtime = - TokioRustlsRuntime::current().expect("Couldn't get the current tokio rustls runtime"); - TorClient::with_runtime(runtime) - } - - /// Creates a bootstrapped `TorTransport` - /// - /// # Errors - /// Could return error emitted during Tor bootstrap by Arti. - pub async fn bootstrapped() -> Result { - let builder = Self::builder(); - let ret = Self::from_builder(&builder, AddressConversion::DnsOnly)?; - ret.bootstrap().await?; - Ok(ret) - } - - /// Builds a `TorTransport` from an Arti `TorClientBuilder` but does not bootstrap it. - /// - /// # Errors - /// Could return error emitted during creation of the `TorClient`. - pub fn from_builder( - builder: &TorClientBuilder, - conversion_mode: AddressConversion, - ) -> Result { - let client = Arc::new(builder.create_unbootstrapped()?); - - Ok(Self::from_client(client, conversion_mode)) - } - - /// Builds a `TorTransport` from an existing Arti `TorClient`. - pub fn from_client( - client: Arc>, - conversion_mode: AddressConversion, - ) -> Self { - Self { - conversion_mode, - client, - #[cfg(feature = "listen-onion-service")] - listeners: HashMap::new(), - #[cfg(feature = "listen-onion-service")] - services: Vec::new(), - } - } - - /// Bootstraps the `TorTransport` into the Tor network. - /// - /// # Errors - /// Could return error emitted during bootstrap by Arti. - pub async fn bootstrap(&self) -> Result<(), TorError> { - self.client.bootstrap().await - } - - /// Set the address conversion mode - #[must_use] - pub fn with_address_conversion(mut self, conversion_mode: AddressConversion) -> Self { - self.conversion_mode = conversion_mode; - self - } - - /// Call this function to instruct the transport to listen on a specific onion address - /// You need to call this function **before** calling `listen_on` - /// - /// # Returns - /// Returns the Multiaddr of the onion address that the transport can be instructed to listen on - /// To actually listen on the address, you need to call [`listen_on`] with the returned address - /// - /// # Errors - /// Returns an error if we cannot get the onion address of the service - #[cfg(feature = "listen-onion-service")] - pub fn add_onion_service( - &mut self, - svc_cfg: OnionServiceConfig, - port: u16, - ) -> anyhow::Result { - let (service, request_stream) = self.client.launch_onion_service(svc_cfg)?; - let request_stream = Box::pin(handle_rend_requests(request_stream)); - - let multiaddr = service - .onion_name() - .ok_or_else(|| anyhow::anyhow!("Onion service has no onion address"))? - .to_multiaddr(port); - - self.services.push((service, request_stream)); - - Ok(multiaddr) - } -} - -#[derive(Debug, Error)] -pub enum TorTransportError { - #[error(transparent)] - Client(#[from] TorError), - #[cfg(feature = "listen-onion-service")] - #[error(transparent)] - Service(#[from] tor_hsservice::ClientError), - #[cfg(feature = "listen-onion-service")] - #[error("Stream closed before receiving data")] - StreamClosed, - #[cfg(feature = "listen-onion-service")] - #[error("Stream port does not match listener port")] - StreamPortMismatch, - #[cfg(feature = "listen-onion-service")] - #[error("Onion service is broken")] - Broken, -} - -#[cfg(feature = "listen-onion-service")] -trait HsIdExt { - fn to_multiaddr(&self, port: u16) -> Multiaddr; -} - -#[cfg(feature = "listen-onion-service")] -impl HsIdExt for HsId { - /// Convert an `HsId` to a `Multiaddr` - fn to_multiaddr(&self, port: u16) -> Multiaddr { - let onion_domain = self.to_string(); - let onion_without_dot_onion = onion_domain - .split('.') - .nth(0) - .expect("Display formatting of HsId to contain .onion suffix"); - let multiaddress_string = format!("/onion3/{onion_without_dot_onion}:{port}"); - - Multiaddr::from_str(&multiaddress_string) - .expect("A valid onion address to be convertible to a Multiaddr") - } -} - -impl Transport for TorTransport { - type Output = TokioTorStream; - type Error = TorTransportError; - type Dial = BoxFuture<'static, Result>; - type ListenerUpgrade = PendingUpgrade; - - #[cfg(not(feature = "listen-onion-service"))] - fn listen_on( - &mut self, - _id: ListenerId, - onion_address: Multiaddr, - ) -> Result<(), TransportError> { - // If the `listen-onion-service` feature is not enabled, we do not support listening - Err(TransportError::MultiaddrNotSupported(onion_address.clone())) - } - - #[cfg(feature = "listen-onion-service")] - fn listen_on( - &mut self, - id: ListenerId, - onion_address: Multiaddr, - ) -> Result<(), TransportError> { - // If the address is not an onion3 address, return an error - let Some(libp2p::multiaddr::Protocol::Onion3(address)) = onion_address.into_iter().nth(0) - else { - return Err(TransportError::MultiaddrNotSupported(onion_address.clone())); - }; - - // Find the running onion service that matches the requested address - // If we find it, remove it from [`services`] and insert it into [`listeners`] - let position = self - .services - .iter() - .position(|(service, _)| { - service.onion_name().map_or(false, |name| { - name.to_multiaddr(address.port()) == onion_address - }) - }) - .ok_or_else(|| TransportError::MultiaddrNotSupported(onion_address.clone()))?; - - let (service, request_stream) = self.services.remove(position); - - let status_stream = Box::pin(service.status_events()); - - self.listeners.insert( - id, - TorListener { - service, - request_stream, - onion_address: onion_address.clone(), - port: address.port(), - status_stream, - announced: false, - }, - ); - - Ok(()) - } - - // We do not support removing listeners if the `listen-onion-service` feature is not enabled - #[cfg(not(feature = "listen-onion-service"))] - fn remove_listener(&mut self, _id: ListenerId) -> bool { - false - } - - #[cfg(feature = "listen-onion-service")] - fn remove_listener(&mut self, id: ListenerId) -> bool { - // Take the listener out of the map. This will stop listening on onion service for libp2p connections (we will not poll it anymore) - // However, we will not stop the onion service itself because we might want to reuse it later - // The onion service will be stopped when the transport is dropped - if let Some(listener) = self.listeners.remove(&id) { - self.services - .push((listener.service, listener.request_stream)); - return true; - } - - false - } - - fn dial(&mut self, addr: Multiaddr) -> Result> { - let maybe_tor_addr = match self.conversion_mode { - AddressConversion::DnsOnly => safe_extract(&addr), - AddressConversion::IpAndDns => dangerous_extract(&addr), - }; - - let tor_address = - maybe_tor_addr.ok_or(TransportError::MultiaddrNotSupported(addr.clone()))?; - let onion_client = self.client.clone(); - - Ok(Box::pin(async move { - let stream = onion_client.connect(tor_address).await?; - - tracing::debug!(%addr, "Established connection to peer through Tor"); - - Ok(TokioTorStream::from(stream)) - })) - } - - fn dial_as_listener( - &mut self, - addr: Multiaddr, - ) -> Result> { - self.dial(addr) - } - - fn address_translation(&self, _listen: &Multiaddr, _observed: &Multiaddr) -> Option { - None - } - - #[cfg(not(feature = "listen-onion-service"))] - fn poll( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { - // If the `listen-onion-service` feature is not enabled, we do not support listening - Poll::Pending - } - - #[cfg(feature = "listen-onion-service")] - fn poll( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - for (listener_id, listener) in &mut self.listeners { - // Check if the service has any new statuses - if let Poll::Ready(Some(status)) = listener.status_stream.as_mut().poll_next(cx) { - tracing::debug!( - status = ?status.state(), - address = listener.onion_address.to_string(), - "Onion service status changed" - ); - } - - // Check if we have already announced this address, if not, do it now - if !listener.announced { - listener.announced = true; - - // We announce the address here to the swarm even though we technically cannot guarantee - // that the address is reachable yet from the outside. We might not have registered the - // onion service fully yet (introduction points, hsdir, ...) - // - // However, we need to announce it now because otherwise libp2p might not poll the listener - // again and we will not be able to announce it later. - // TODO: Find out why this is the case, if this is intended behaviour or a bug - return Poll::Ready(TransportEvent::NewAddress { - listener_id: *listener_id, - listen_addr: listener.onion_address.clone(), - }); - } - - match listener.request_stream.as_mut().poll_next(cx) { - Poll::Ready(Some(request)) => { - let port = listener.port; - let upgrade: PendingUpgrade = Box::pin(async move { - // Check if the port matches what we expect - if let IncomingStreamRequest::Begin(begin) = request.request() { - if begin.port() != port { - // Reject the connection with CONNECTREFUSED - request - .reject(End::new_with_reason(EndReason::CONNECTREFUSED)) - .await?; - - return Err(TorTransportError::StreamPortMismatch); - } - } - - // Accept the stream and forward it to the swarm - let data_stream = request.accept(Connected::new_empty()).await?; - Ok(TokioTorStream::from(data_stream)) - }); - - return Poll::Ready(TransportEvent::Incoming { - listener_id: *listener_id, - upgrade, - local_addr: listener.onion_address.clone(), - send_back_addr: listener.onion_address.clone(), - }); - } - - // The stream has ended - // This means that the onion service was shut down, and we will not receive any more connections on it - Poll::Ready(None) => { - return Poll::Ready(TransportEvent::ListenerClosed { - listener_id: *listener_id, - reason: Ok(()), - }); - } - Poll::Pending => {} - } - } +mod arti; +pub use arti::*; - Poll::Pending - } -} +#[cfg(feature="tor-interface")] +mod tor; +#[cfg(feature="tor-interface")] +pub use tor::*; diff --git a/src/tor/address.rs b/src/tor/address.rs new file mode 100644 index 0000000..441006b --- /dev/null +++ b/src/tor/address.rs @@ -0,0 +1,161 @@ +// Copyright 2022 Hannes Furmans +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +use libp2p::{core::multiaddr::Protocol, Multiaddr}; +use tor_interface::tor_provider::{OnionAddr, OnionAddrV3, TargetAddr, DomainAddr}; +use tor_interface::tor_crypto::{V3OnionServiceId, Ed25519PublicKey}; +use std::net::SocketAddr; + +/// "Dangerously" extract a Tor address from the provided [`Multiaddr`]. +/// +/// Refer tor [arti_client::TorAddr](https://docs.rs/arti-client/latest/arti_client/struct.TorAddr.html) for details around the safety / privacy considerations. +pub fn dangerous_extract(multiaddr: &Multiaddr) -> Option { + if let Some(tor_addr) = safe_extract(multiaddr) { + return Some(tor_addr); + } + + let mut protocols = multiaddr.into_iter(); + + try_to_socket_addr(&protocols.next()?, &protocols.next()?) +} + +/// "Safely" extract a Tor address from the provided [`Multiaddr`]. +/// +/// Refer tor [arti_client::TorAddr](https://docs.rs/arti-client/latest/arti_client/struct.TorAddr.html) for details around the safety / privacy considerations. +pub fn safe_extract(multiaddr: &Multiaddr) -> Option { + let mut protocols = multiaddr.into_iter(); + + let (dom, port) = (protocols.next()?, protocols.next()); + try_to_domain_and_port(&dom, &port) +} + +fn try_to_domain_and_port<'a>( + maybe_domain: &'a Protocol, + maybe_port: &Option, +) -> Option { + match (maybe_domain, maybe_port) { + ( + Protocol::Dns(domain) | Protocol::Dns4(domain) | Protocol::Dns6(domain), + Some(Protocol::Tcp(port)), + ) => Some(TargetAddr::Domain(DomainAddr::try_from((domain.to_string(), *port)).ok()?.into())), + (Protocol::Onion3(domain), _) => + Some(TargetAddr::OnionService(OnionAddr::V3(OnionAddrV3::new(V3OnionServiceId::from_public_key(&Ed25519PublicKey::from_raw(domain.hash()[..32].try_into().unwrap()).ok()?), domain.port())))), + _ => None, + } +} + +fn try_to_socket_addr(maybe_ip: &Protocol, maybe_port: &Protocol) -> Option { + match (maybe_ip, maybe_port) { + (Protocol::Ip4(ip), Protocol::Tcp(port)) => Some(TargetAddr::Socket(SocketAddr::from((*ip, *port)))), + (Protocol::Ip6(ip), Protocol::Tcp(port)) => Some(TargetAddr::Socket(SocketAddr::from((*ip, *port)))), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tor_interface::tor_provider::TargetAddr; + use std::str::FromStr; + + #[test] + fn extract_correct_address_from_dns() { + let addresses = [ + "/dns/ip.tld/tcp/10".parse().unwrap(), + "/dns4/dns.ip4.tld/tcp/11".parse().unwrap(), + "/dns6/dns.ip6.tld/tcp/12".parse().unwrap(), + "/onion3/cebulka7uxchnbpvmqapg5pfos4ngaxglsktzvha7a5rigndghvadeyd:13".parse().unwrap(), + ]; + + let actual = addresses + .iter() + .filter_map(safe_extract) + .collect::>(); + + assert_eq!( + &[ + TargetAddr::from_str("ip.tld:10").unwrap(), + TargetAddr::from_str("dns.ip4.tld:11").unwrap(), + TargetAddr::from_str("dns.ip6.tld:12").unwrap(), + TargetAddr::from_str("cebulka7uxchnbpvmqapg5pfos4ngaxglsktzvha7a5rigndghvadeyd.onion:13").unwrap(), + ], + actual.as_slice() + ); + } + + #[test] + fn extract_correct_address_from_ips() { + let addresses = [ + "/ip4/127.0.0.1/tcp/10".parse().unwrap(), + "/ip6/::1/tcp/10".parse().unwrap(), + ]; + + let actual = addresses + .iter() + .filter_map(dangerous_extract) + .collect::>(); + + assert_eq!( + &[ + TargetAddr::from_str("127.0.0.1:10").unwrap(), + TargetAddr::from_str("[::1]:10").unwrap(), + ], + actual.as_slice() + ); + } + + #[test] + fn dangerous_extract_works_on_domains_too() { + let addresses = [ + "/dns/ip.tld/tcp/10".parse().unwrap(), + "/ip4/127.0.0.1/tcp/10".parse().unwrap(), + "/ip6/::1/tcp/10".parse().unwrap(), + ]; + + let actual = addresses + .iter() + .filter_map(dangerous_extract) + .collect::>(); + + assert_eq!( + &[ + TargetAddr::from_str("ip.tld:10").unwrap(), + TargetAddr::from_str("127.0.0.1:10").unwrap(), + TargetAddr::from_str("[::1]:10").unwrap(), + ], + actual.as_slice() + ); + } + + #[test] + fn detect_incorrect_address() { + let addresses = [ + "/tcp/10/udp/12".parse().unwrap(), + "/dns/ip.tld/dns4/ip.tld/dns6/ip.tld".parse().unwrap(), + "/tcp/10/ip4/1.1.1.1".parse().unwrap(), + ]; + + let all_correct = addresses.iter().map(safe_extract).all(|res| res.is_none()); + + assert!( + all_correct, + "During the parsing of the faulty addresses, there was an incorrectness" + ); + } +} diff --git a/src/tor/mod.rs b/src/tor/mod.rs new file mode 100644 index 0000000..7b69214 --- /dev/null +++ b/src/tor/mod.rs @@ -0,0 +1,349 @@ +// Copyright 2022 Hannes Furmans +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#![warn(clippy::pedantic)] +#![deny(unsafe_code)] + +use futures::future::BoxFuture; +use tor_interface::tor_provider::{self, CircuitToken, TcpOnionListener, TcpOrUnixOnionStream, TorProvider, OnionListener, OnionAddr}; +use tor_interface::tor_crypto::{V3OnionServiceId, Ed25519PrivateKey, X25519PublicKey}; +use libp2p::{ + core::transport::{ListenerId, TransportEvent}, + Multiaddr, Transport, TransportError, +}; + +use std::collections::{BTreeSet, HashMap}; +use std::str::FromStr; +use tokio::net::TcpListener; + +use std::pin::Pin; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::net::SocketAddr; +use std::task::{Context, Poll}; +use thiserror::Error; + +mod address; +mod provider; + +use address::{dangerous_extract, safe_extract}; +pub use provider::OnionStreamStream; + +use crate::AddressConversion; + +pub use tor_interface; + +/// Get a [`TorProvider`](`tor_provider::TorProvider`) from [`tor_interface`] +pub struct TorInterfaceTransport { + pub conversion_mode: AddressConversion, + pub provider: Arc>, + pub circuit: Option, + + /// Onion services we are listening on. + listeners: HashMap, + + /// Onion services we are running (implicitly excluded if ListenerId present) + services: Vec<(TcpOnionListener, Option)>, + + /// Services yet to be announced + waiting_to_announce: HashMap, + + event_backlog: Vec, + + /// Persistent list of services we already publish + /// + /// Tor delineates services by onion but libp2p does it by onion:port + published_services: BTreeSet, +} + +#[derive(Debug, Error)] +pub enum TorInterfaceTransportError { + #[error(transparent)] + Client(#[from] tor_provider::Error), + #[error(transparent)] + Io(#[from] std::io::Error), +} + +fn lock(m: &Mutex) -> MutexGuard<'_, T> { + match m.lock() { + Ok(o) => o, + Err(e) => e.into_inner(), + } +} + +fn bootstrap(provider: &mut T) -> Result<(), tor_provider::Error> { + match provider.bootstrap() { + Err(tor_provider::Error::Generic(s)) if s.ends_with(" already bootstrapped") => Ok(()), + res @ Ok(_) | res @ Err(_) => res, + } +} + +impl> TorInterfaceTransport { + /// Creates a new `TorClientBuilder`. + pub fn from_provider( + conversion_mode: AddressConversion, + provider: Arc>, + circuit: Option + ) -> Result { + bootstrap(&mut *lock(&provider))?; + Ok(Self { + conversion_mode: conversion_mode, + provider: provider, + circuit: circuit, + listeners: HashMap::new(), + services: Vec::new(), + waiting_to_announce: Default::default(), + event_backlog: Default::default(), + published_services: Default::default(), + }) + } + + /// Call this function to instruct the transport to listen on a specific onion address + /// You need to call this function **before** calling `listen_on` + /// + /// # Returns + /// Returns the Multiaddr of the onion address that the transport can be instructed to listen on + /// To actually listen on the address, you need to call [`listen_on()`] with the returned address + /// + /// # Blocks + /// If listening fails with an `LegacyTorNotBootstrapped` error, + /// `bootstrap()`s the provider and awaits bootstrap confirtmation + /// + /// # Errors + /// Returns an error if we couldn't talk to the tor daemon + pub fn add_onion_service( + &mut self, + private_key: &Ed25519PrivateKey, + virt_port: u16, + authorised_clients: Option<&[X25519PublicKey]>, + socket_addr: Option, + ) -> anyhow::Result { + let ol = self.listener_or_bootstrap(|p| p.listener(private_key, virt_port, authorised_clients, socket_addr))?; + ol.set_nonblocking(true)?; + + self.services.push((ol, None)); + + let svid = V3OnionServiceId::from_private_key(&private_key); + let multiaddr = svid.to_multiaddr(virt_port); + + Ok(multiaddr) + } + + fn listener_or_bootstrap Result>(&mut self, mut f: F) -> Result { + loop { + let attempt = f(&mut lock(&self.provider)); // Moving this into the match clause deadlocks (Guard still borrowed) + match attempt { + Err(tor_provider::Error::Generic(s)) if s.ends_with(" not bootstrapped") => { + bootstrap(&mut *lock(&self.provider))?; + self.event_backlog.extend(lock(&self.provider).update()?); + } + res @ Ok(_) | res @ Err(_) => return res, + } + } + } +} + +trait HsIdExt { + fn to_multiaddr(&self, port: u16) -> Multiaddr; +} + +impl HsIdExt for V3OnionServiceId { + /// Convert an `V3OnionServiceId` to a `Multiaddr` + fn to_multiaddr(&self, port: u16) -> Multiaddr { + // The internal representation of V3OnionServiceId is 52 characters, so we can't re-use it here. + let multiaddress_string = format!("/onion3/{self}:{port}"); + + Multiaddr::from_str(&multiaddress_string) + .expect("A valid onion address to be convertible to a Multiaddr") + } +} + +trait OnionAddrExt { + fn to_multiaddr(&self) -> Multiaddr; +} + +impl OnionAddrExt for OnionAddr { + fn to_multiaddr(&self) -> Multiaddr { + let OnionAddr::V3(v3) = self; + v3.service_id().to_multiaddr(v3.virt_port()) + } +} + +#[cfg(test)] +#[test] +fn to_multiaddr() { + use tor_interface::tor_crypto::Ed25519PublicKey; + use libp2p::multiaddr::multiaddr; + let test = V3OnionServiceId::from_public_key(&Ed25519PublicKey::from_raw(&[0; 32]).unwrap()).to_multiaddr(12345); + assert_eq!( + test, + multiaddr!(Onion3(( + [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0xCD, 0x0E, 0x03 + ], + 12345 + ))) + ); + assert_eq!( + test.to_string(), + "/onion3/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaam2dqd:12345" + ); +} + + +impl + Send + Sync + 'static> Transport for TorInterfaceTransport { + type Error = TorInterfaceTransportError; + type Output = OnionStreamStream; + type ListenerUpgrade = std::future::Ready>; + type Dial = BoxFuture<'static, Result>; + + fn listen_on( + &mut self, + id: ListenerId, + onion_address: Multiaddr, + ) -> Result<(), TransportError> { + // If the address is not an onion3 address, return an error + if !matches!(onion_address.into_iter().nth(0), Some(libp2p::multiaddr::Protocol::Onion3(_))) { + return Err(TransportError::MultiaddrNotSupported(onion_address)); + } + + // Find the running onion service that matches the requested address + // If we find it, tag it in [`services`] and insert it into [`listeners`] + let service = self + .services + .iter_mut() + .find(|(service, listener_id)| listener_id.is_none() && service.address().to_multiaddr() == onion_address); + let Some((service, listener_id)) = service + else { + return Err(TransportError::MultiaddrNotSupported(onion_address)); + }; + + + let listener = service.try_clone_inner().and_then(TcpListener::from_std).map_err(TorInterfaceTransportError::Io).map_err(TransportError::Other)?; + *listener_id = Some(id); + + self.listeners.insert(id, listener); + self.waiting_to_announce.insert(id, service.address().clone()); + + Ok(()) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + // Take the listener out of the map. This will stop listening on onion service for libp2p connections (we will not poll it anymore) + // However, we will not stop the onion service itself because we might want to reuse it later + // The onion service will be stopped when the transport is dropped + if let Some(_) = self.listeners.remove(&id) { + let Some((_, listener_id)) = self.services.iter_mut().find(|(_, listener_id)| *listener_id == Some(id)) + else { unreachable!() }; + *listener_id = None; + self.waiting_to_announce.remove(&id); + return true; + } + + false + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let maybe_tor_addr = match self.conversion_mode { + AddressConversion::DnsOnly => safe_extract(&addr), + AddressConversion::IpAndDns => dangerous_extract(&addr), + }; + + let Some(tor_address) = maybe_tor_addr + else { return Err(TransportError::MultiaddrNotSupported(addr)); }; + let provider = self.provider.clone(); + let circuit = self.circuit; + + Ok(Box::pin(async move { + let stream = lock(&provider).connect(tor_address, circuit).map_err(Self::Error::Client)?; + + tracing::debug!(%addr, "Established connection to peer through Tor"); + + OnionStreamStream::from_onion_stream(stream).map_err(Self::Error::Io) + })) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.dial(addr) + } + + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { + None + } + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + while !&self.event_backlog.is_empty() { + match self.event_backlog.swap_remove(0) { + tor_provider::TorEvent::BootstrapStatus { .. } => {} + tor_provider::TorEvent::BootstrapComplete => tracing::debug!("Tor bootstrap complete"), + tor_provider::TorEvent::LogReceived { line } => tracing::debug!(%line), + tor_provider::TorEvent::OnionServicePublished { service_id } => { self.published_services.insert(service_id); }, + } + } + + // This is HashMap::extract_if() but that's unstable rn; not perf-sensitive (self.waiting_to_announce.len() is almost always 0) + if let Some(listener_id) = self.waiting_to_announce.iter().find(|(_, addr)| { + let OnionAddr::V3(addr) = addr; + self.published_services.contains(addr.service_id()) + }).map(|(listener_id, _)| listener_id).copied() { + return Poll::Ready(TransportEvent::NewAddress { + listener_id, + listen_addr: self.waiting_to_announce.remove(&listener_id).unwrap(/*key from find()*/).to_multiaddr(), + }); + } + + let new_events = lock(&self.provider).update().unwrap_or(vec![]); + self.event_backlog.extend(new_events); + if !self.event_backlog.is_empty() { + return self.poll(cx); + } + + for (&listener_id, listener) in &mut self.listeners { + match listener.poll_accept(cx) { + Poll::Ready(Ok((caller, _))) => { + let service_addr = self.services.iter().find(|(_, li)| *li == Some(listener_id)).map(|(ol, _)| ol.address()); + let multi = service_addr.map(|ra| ra.to_multiaddr()).unwrap_or(Multiaddr::empty()); + + return Poll::Ready(TransportEvent::Incoming { + listener_id, + upgrade: std::future::ready(Ok((caller, service_addr.cloned()).into())), + local_addr: multi.clone(), + send_back_addr: multi, + }); + } + + Poll::Ready(Err(err)) => { + return Poll::Ready(TransportEvent::ListenerError { listener_id, error: err.into() }); + } + + Poll::Pending => {}, + } + } + + Poll::Pending + } +} diff --git a/src/tor/provider.rs b/src/tor/provider.rs new file mode 100644 index 0000000..bdab38a --- /dev/null +++ b/src/tor/provider.rs @@ -0,0 +1,161 @@ +// Copyright 2022 Hannes Furmans +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The UnixStream implementation is backported and open-coded from +//! It should be removed when libp2p is updated + +use futures::{AsyncRead, AsyncWrite}; +use tor_interface::tor_provider::{OnionAddr, OnionStream, TargetAddr, TcpOrUnixOnionStream, TcpOrUnixStream}; +use libp2p::tcp::tokio::TcpStream; +#[cfg(unix)] +// use libp2p::unix_stream::tokio::UnixStream; +use tokio::net::UnixStream; + +#[derive(Debug)] +enum TcpOrUnix { + Tcp(TcpStream), + #[cfg(unix)] + Unix(UnixStream), +} + +#[derive(Debug)] +pub struct OnionStreamStream { + pub local_addr: Option, + pub peer_addr: Option, + stream: TcpOrUnix, +} + +impl From<(tokio::net::TcpStream, Option)> for OnionStreamStream { + fn from((stream, local_addr): (tokio::net::TcpStream, Option)) -> Self { + let stream = TcpOrUnix::Tcp(TcpStream(stream)); + Self { local_addr, peer_addr: None, stream } + } +} + +impl OnionStreamStream { + pub fn from_onion_stream(inner: TcpOrUnixOnionStream) -> std::io::Result { + let local_addr = inner.local_addr(); + let peer_addr = inner.peer_addr(); + inner.set_nonblocking(true)?; + let stream = match inner.into() { + TcpOrUnixStream::Tcp(sock) => TcpOrUnix::Tcp(TcpStream(tokio::net::TcpStream::from_std(sock)?)), + #[cfg(unix)] + TcpOrUnixStream::Unix(sock) => TcpOrUnix::Unix(UnixStream::from_std(sock.into())?), + }; + Ok(Self { local_addr, peer_addr, stream }) + } +} + +impl AsyncRead for OnionStreamStream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + match &mut self.stream { + TcpOrUnix::Tcp(sock) => AsyncRead::poll_read(std::pin::Pin::new(sock), cx, buf), + #[cfg(unix)] + // TcpOrUnix::Unix(sock) => AsyncRead::poll_read(std::pin::Pin::new(&mut sock), cx, buf), + TcpOrUnix::Unix(sock) => { + let mut read_buf = tokio::io::ReadBuf::new(buf); + futures::ready!(tokio::io::AsyncRead::poll_read(std::pin::Pin::new(sock), cx, &mut read_buf))?; + std::task::Poll::Ready(Ok(read_buf.filled().len())) + } + } + } + + fn poll_read_vectored( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &mut [std::io::IoSliceMut<'_>], + ) -> std::task::Poll> { + match &mut self.stream { + TcpOrUnix::Tcp(ref mut sock) => AsyncRead::poll_read_vectored(std::pin::Pin::new(sock), cx, bufs), + #[cfg(unix)] + // TcpOrUnix::Unix(ref mut sock) => AsyncRead::poll_read_vectored(std::pin::Pin::new(sock), cx, bufs), + TcpOrUnix::Unix(_) => { + // From default impl + for b in bufs { + if !b.is_empty() { + return self.poll_read(cx, b); + } + } + + self.poll_read(cx, &mut []) + } + } + } +} + +impl AsyncWrite for OnionStreamStream { + #[inline] + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match &mut self.stream { + TcpOrUnix::Tcp(ref mut sock) => AsyncWrite::poll_write(std::pin::Pin::new(sock), cx, buf), + #[cfg(unix)] + // TcpOrUnix::Unix(sock) => AsyncWrite::poll_write(std::pin::Pin::new(sock), cx, buf), + TcpOrUnix::Unix(ref mut sock) => tokio::io::AsyncWrite::poll_write(std::pin::Pin::new(sock), cx, buf) + } + } + + #[inline] + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut self.stream { + TcpOrUnix::Tcp(ref mut sock) => AsyncWrite::poll_flush(std::pin::Pin::new(sock), cx), + #[cfg(unix)] + // TcpOrUnix::Unix(sock) => AsyncWrite::poll_flush(std::pin::Pin::new(sock), cx), + TcpOrUnix::Unix(ref mut sock) => tokio::io::AsyncWrite::poll_flush(std::pin::Pin::new(sock), cx) + } + } + + #[inline] + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut self.stream { + TcpOrUnix::Tcp(ref mut sock) => AsyncWrite::poll_close(std::pin::Pin::new(sock), cx), + #[cfg(unix)] + // TcpOrUnix::Unix(sock) => AsyncWrite::poll_close(std::pin::Pin::new(sock), cx), + TcpOrUnix::Unix(ref mut sock) => tokio::io::AsyncWrite::poll_shutdown(std::pin::Pin::new(sock), cx) + } + } + + #[inline] + fn poll_write_vectored( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll> { + match &mut self.stream { + TcpOrUnix::Tcp(ref mut sock) => AsyncWrite::poll_write_vectored(std::pin::Pin::new(sock), cx, bufs), + #[cfg(unix)] + // TcpOrUnix::Unix(sock) => AsyncWrite::poll_write_vectored(std::pin::Pin::new(sock), cx, bufs), + TcpOrUnix::Unix(ref mut sock) => tokio::io::AsyncWrite::poll_write_vectored(std::pin::Pin::new(sock), cx, bufs) + } + } +}