Skip to content

Commit d44ca9e

Browse files
Perfuse TorBackend through monero-rpc-pool
1 parent 24f69ef commit d44ca9e

File tree

11 files changed

+133
-86
lines changed

11 files changed

+133
-86
lines changed

monero-rpc-pool/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ tracing-subscriber = { workspace = true }
3838
# Async runtime
3939
crossbeam = "0.8.4"
4040
tokio = { workspace = true, features = ["full"] }
41+
tokio-socks = "0.5"
4142

4243
# Serialization
4344
chrono = { version = "0.4", features = ["serde"] }
@@ -69,6 +70,7 @@ tor-rtcompat = { workspace = true, features = ["tokio", "rustls"] }
6970
monero = { workspace = true }
7071
monero-rpc = { path = "../monero-rpc" }
7172
swap-serde = { path = "../swap-serde" }
73+
swap-tor = { path = "../swap-tor" }
7274

7375
# Optional dependencies (for features)
7476
cuprate-epee-encoding = { git = "https://github.com/Cuprate/cuprate.git", optional = true }

monero-rpc-pool/src/bin/stress_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7777
.await
7878
.expect("Failed to bootstrap Tor client");
7979

80-
Some(client)
80+
swap_tor::TorBackend::Arti(client)
8181
} else {
82-
None
82+
swap_tor::TorBackend::None
8383
};
8484

8585
// Start the pool server

monero-rpc-pool/src/bin/stress_test_downloader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
133133
.await
134134
.expect("Failed to bootstrap Tor client");
135135

136-
Some(client)
136+
swap_tor::TorBackend::Arti(client)
137137
} else {
138-
None
138+
swap_tor::TorBackend::None
139139
};
140140

141141
// Start the pool server

monero-rpc-pool/src/config.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,44 @@
11
use monero::Network;
22
use std::path::PathBuf;
3+
use swap_tor::TorBackend;
34

4-
use crate::TorClientArc;
5-
6-
#[derive(Clone)]
5+
#[derive(Clone, Debug)]
76
pub struct Config {
87
pub host: String,
98
pub port: u16,
109
pub data_dir: PathBuf,
11-
pub tor_client: Option<TorClientArc>,
10+
pub tor_client: TorBackend,
1211
pub network: Network,
1312
}
1413

15-
impl std::fmt::Debug for Config {
16-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
17-
f.debug_struct("Config")
18-
.field("host", &self.host)
19-
.field("port", &self.port)
20-
.field("data_dir", &self.data_dir)
21-
.field("tor_client", &self.tor_client.is_some())
22-
.field("network", &self.network)
23-
.finish()
24-
}
25-
}
26-
2714
impl Config {
2815
pub fn new_with_port(host: String, port: u16, data_dir: PathBuf, network: Network) -> Self {
29-
Self::new_with_port_and_tor_client(host, port, data_dir, None, network)
16+
Self::new_with_port_and_tor_client(host, port, data_dir, TorBackend::None, network)
3017
}
3118

3219
pub fn new_with_port_and_tor_client(
3320
host: String,
3421
port: u16,
3522
data_dir: PathBuf,
36-
tor_client: impl Into<Option<TorClientArc>>,
23+
tor_client: TorBackend,
3724
network: Network,
3825
) -> Self {
3926
Self {
4027
host,
4128
port,
4229
data_dir,
43-
tor_client: tor_client.into(),
30+
tor_client,
4431
network,
4532
}
4633
}
4734

4835
pub fn new_random_port(data_dir: PathBuf, network: Network) -> Self {
49-
Self::new_random_port_with_tor_client(data_dir, None, network)
36+
Self::new_random_port_with_tor_client(data_dir, TorBackend::None, network)
5037
}
5138

5239
pub fn new_random_port_with_tor_client(
5340
data_dir: PathBuf,
54-
tor_client: impl Into<Option<TorClientArc>>,
41+
tor_client: TorBackend,
5542
network: Network,
5643
) -> Self {
5744
Self::new_with_port_and_tor_client(

monero-rpc-pool/src/lib.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,21 @@
11
use std::sync::Arc;
22

33
use anyhow::Result;
4-
use arti_client::TorClient;
54
use axum::{
65
routing::{any, get},
76
Router,
87
};
98

109
use tokio::task::JoinHandle;
11-
use tor_rtcompat::tokio::TokioRustlsRuntime;
1210
use tower_http::cors::CorsLayer;
1311
use tracing::{error, info};
1412

15-
/// Type alias for the Tor client used throughout the crate
16-
pub type TorClientArc = Arc<TorClient<TokioRustlsRuntime>>;
17-
1813
pub mod config;
1914
pub mod connection_pool;
2015
pub mod database;
2116
pub mod pool;
2217
pub mod proxy;
18+
pub(crate) mod tor;
2319
pub mod types;
2420

2521
use config::Config;
@@ -30,7 +26,7 @@ use proxy::{proxy_handler, stats_handler};
3026
#[derive(Clone)]
3127
pub struct AppState {
3228
pub node_pool: Arc<NodePool>,
33-
pub tor_client: Option<TorClientArc>,
29+
pub tor_client: swap_tor::TorBackend,
3430
pub connection_pool: crate::connection_pool::ConnectionPool,
3531
}
3632

monero-rpc-pool/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
7474
}
7575
});
7676

77-
Some(client)
77+
swap_tor::TorBackend::Arti(client)
7878
} else {
79-
None
79+
swap_tor::TorBackend::None
8080
};
8181

8282
let config = Config::new_with_port_and_tor_client(

monero-rpc-pool/src/proxy.rs

Lines changed: 12 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ use std::pin::Pin;
1010
use std::sync::Arc;
1111
use std::time::Duration;
1212
use tokio::net::TcpStream;
13-
use tokio::{
14-
io::{AsyncRead, AsyncWrite},
15-
time::timeout,
16-
};
13+
use tokio::time::timeout;
1714

1815
use tokio_rustls::rustls::{
1916
client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
@@ -22,6 +19,7 @@ use tokio_rustls::rustls::{
2219
};
2320
use tracing::{error, info_span, Instrument};
2421

22+
use crate::tor::*;
2523
use crate::AppState;
2624

2725
/// wallet2.h has a default timeout of 3 minutes + 30 seconds.
@@ -32,10 +30,6 @@ static TIMEOUT: Duration = Duration::from_secs(3 * 60 + 30).checked_div(2).unwra
3230
/// If the main node does not finish within this period, we start a hedged request.
3331
static SOFT_TIMEOUT: Duration = TIMEOUT.checked_div(2).unwrap();
3432

35-
/// Trait alias for a stream that can be used with hyper
36-
trait HyperStream: AsyncRead + AsyncWrite + Unpin + Send {}
37-
impl<T: AsyncRead + AsyncWrite + Unpin + Send> HyperStream for T {}
38-
3933
#[derive(Debug)]
4034
struct NoCertificateVerification;
4135

@@ -149,14 +143,7 @@ async fn proxy_to_multiple_nodes(
149143

150144
// Sort nodes to prioritize those with available connections
151145
// Check if we're using Tor for this request
152-
let use_tor = match &state.tor_client {
153-
Some(tc)
154-
if tc.bootstrap_status().ready_for_traffic() && !request.clearnet_whitelisted() =>
155-
{
156-
true
157-
}
158-
_ => false,
159-
};
146+
let use_tor = state.tor_client.ready_for_traffic() && !request.clearnet_whitelisted();
160147

161148
// Create a vector of (node, has_connection) pairs
162149
let mut nodes_with_availability = Vec::new();
@@ -321,7 +308,7 @@ async fn proxy_to_multiple_nodes(
321308

322309
/// Wraps a stream with TLS if HTTPS is being used
323310
async fn maybe_wrap_with_tls(
324-
stream: impl AsyncRead + AsyncWrite + Unpin + Send + 'static,
311+
stream: impl HyperStream + 'static,
325312
scheme: &str,
326313
host: &str,
327314
) -> Result<Box<dyn HyperStream>, SingleRequestError> {
@@ -465,14 +452,7 @@ async fn proxy_to_single_node(
465452
tracing::trace!("Request is whitelisted, sending over clearnet");
466453
}
467454

468-
let use_tor = match &state.tor_client {
469-
Some(tc)
470-
if tc.bootstrap_status().ready_for_traffic() && !request.clearnet_whitelisted() =>
471-
{
472-
true
473-
}
474-
_ => false,
475-
};
455+
let use_tor = state.tor_client.ready_for_traffic() && !request.clearnet_whitelisted();
476456

477457
let key = (node.0.clone(), node.1.clone(), node.2, use_tor);
478458

@@ -484,24 +464,14 @@ async fn proxy_to_single_node(
484464
let address = (node.1.as_str(), node.2);
485465

486466
let maybe_tls_stream = timeout(TIMEOUT, async {
487-
let no_tls_stream: Box<dyn HyperStream> = if use_tor {
488-
let tor_client = state.tor_client.as_ref().ok_or_else(|| {
489-
SingleRequestError::ConnectionError("Tor requested but client missing".into())
490-
})?;
491-
492-
let stream = tor_client
493-
.connect(address)
494-
.await
495-
.map_err(|e| SingleRequestError::ConnectionError(format!("{:?}", e)))?;
496-
497-
Box::new(stream)
498-
} else {
499-
let stream = TcpStream::connect(address)
467+
let no_tls_stream = match use_tor {
468+
true => state.tor_client.connect(address).await,
469+
false => TcpStream::connect(address)
500470
.await
501-
.map_err(|e| SingleRequestError::ConnectionError(format!("{:?}", e)))?;
502-
503-
Box::new(stream)
504-
};
471+
.map(|s| Box::new(s) as Box<dyn HyperStream>)
472+
.map_err(|e| e.into()),
473+
}
474+
.map_err(|e| SingleRequestError::ConnectionError(format!("{:?}", e)))?;
505475

506476
maybe_wrap_with_tls(no_tls_stream, &node.0, &node.1).await
507477
})

monero-rpc-pool/src/tor.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
2+
use swap_tor::TorBackend;
3+
use tokio::io::{AsyncRead, AsyncWrite};
4+
use tokio_socks::TargetAddr;
5+
6+
/// Trait alias for a stream that can be used with hyper
7+
pub trait HyperStream: AsyncRead + AsyncWrite + Unpin + Send {}
8+
impl<T: AsyncRead + AsyncWrite + Unpin + Send> HyperStream for T {}
9+
10+
pub trait TorBackendRpc {
11+
fn is_some(&self) -> bool;
12+
fn ready_for_traffic(&self) -> bool;
13+
async fn connect(&self, address: (&str, u16)) -> anyhow::Result<Box<dyn HyperStream>>;
14+
}
15+
impl TorBackendRpc for TorBackend {
16+
fn is_some(&self) -> bool {
17+
!matches!(self, TorBackend::None)
18+
}
19+
20+
fn ready_for_traffic(&self) -> bool {
21+
match self {
22+
TorBackend::Arti(arti) => arti.bootstrap_status().ready_for_traffic(),
23+
TorBackend::Socks(..) => true,
24+
TorBackend::None => false,
25+
}
26+
}
27+
28+
async fn connect(&self, address: (&str, u16)) -> anyhow::Result<Box<dyn HyperStream>> {
29+
match self {
30+
TorBackend::Arti(tor_client) => Ok(Box::new(tor_client.connect(address).await?)),
31+
TorBackend::Socks(proxy) => Ok(Box::new(proxy.proxy(pair_to_socks(address)).await?)),
32+
TorBackend::None => Ok(Box::new(tokio::net::TcpStream::connect(address).await?)),
33+
}
34+
}
35+
}
36+
37+
// Parse order matches tokio::net::ToSocketAddrs
38+
fn pair_to_socks((host, port): (&str, u16)) -> TargetAddr {
39+
if let Ok(addr) = host.parse::<Ipv4Addr>() {
40+
TargetAddr::Ip(SocketAddr::new(addr.into(), 10))
41+
} else if let Ok(addr) = host.parse::<Ipv6Addr>() {
42+
TargetAddr::Ip(SocketAddr::new(addr.into(), 10))
43+
} else {
44+
TargetAddr::Domain(host.into(), port)
45+
}
46+
}
47+
#[cfg(test)]
48+
mod tests {
49+
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
50+
use tokio_socks::TargetAddr;
51+
52+
#[test]
53+
fn pair_to_socks() {
54+
assert_eq!(
55+
[
56+
("ip.tld", 10),
57+
("dns.ip4.tld", 11),
58+
("dns.ip6.tld", 12),
59+
(
60+
"cebulka7uxchnbpvmqapg5pfos4ngaxglsktzvha7a5rigndghvadeyd.onion",
61+
13
62+
),
63+
("127.0.0.1", 10),
64+
("::1", 10),
65+
]
66+
.map(super::pair_to_socks),
67+
[
68+
TargetAddr::Domain("ip.tld".into(), 10),
69+
TargetAddr::Domain("dns.ip4.tld".into(), 11),
70+
TargetAddr::Domain("dns.ip6.tld".into(), 12),
71+
TargetAddr::Domain(
72+
"cebulka7uxchnbpvmqapg5pfos4ngaxglsktzvha7a5rigndghvadeyd.onion".into(),
73+
13,
74+
),
75+
TargetAddr::Ip(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 10)),
76+
TargetAddr::Ip(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 10)),
77+
],
78+
);
79+
}
80+
}

swap-tor/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ description = "Arti/SOCKS5 Tor back-end."
99
anyhow = { workspace = true }
1010
arti-client = { workspace = true, features = ["static-sqlite", "tokio", "rustls", "onion-service-service"] }
1111
data-encoding = "2.6"
12-
libp2p = { workspace = true, features = ["tcp", "yamux", "dns", "noise", "request-response", "ping", "rendezvous", "identify", "macros", "cbor", "json", "tokio", "serde", "rsa"] }
12+
libp2p = { workspace = true }
1313

1414
# Tokio
1515
tokio = { workspace = true, features = ["net"] }

swap-tor/src/lib.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,7 @@ impl Transport for Socks5Transport {
159159

160160
Ok(Box::pin(async move {
161161
Ok(tokio_util::compat::TokioAsyncReadCompatExt::compat(
162-
Socks5Stream::connect_with_socket(proxy.connect().await?, target)
163-
.await?
164-
.into_inner(),
162+
proxy.proxy(target).await?,
165163
))
166164
}))
167165
}
@@ -208,6 +206,15 @@ impl SocksServerAddress {
208206
}
209207
}
210208

209+
pub async fn proxy(
210+
&self,
211+
target: TargetAddr<'_>,
212+
) -> Result<TcpOrUnixStream, tokio_socks::Error> {
213+
Socks5Stream::connect_with_socket(self.connect().await?, target)
214+
.await
215+
.map(Socks5Stream::into_inner)
216+
}
217+
211218
/// Consult `$TOR_SOCKS_{IPC_PATH,HOST+PORT}`
212219
///
213220
/// `$TOR_SOCKS_IPC_PATH` is ignored if `cfg(not(unix))`, and takes precedence if `cfg(unix)`.
@@ -234,3 +241,13 @@ pub enum TorBackend {
234241
Socks(Arc<SocksServerAddress>),
235242
None,
236243
}
244+
245+
impl std::fmt::Debug for TorBackend {
246+
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
247+
f.write_str(match self {
248+
TorBackend::Arti(..) => "Arti",
249+
TorBackend::Socks(..) => "Socks",
250+
TorBackend::None => "None",
251+
})
252+
}
253+
}

0 commit comments

Comments
 (0)