Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub enum TransportError {
#[error("insufficient or broken transport params: {0}")]
Config(String),

#[error("transport connection was cancelled")]
Cancelled,

#[error("transport error: {0}")]
Other(String),
}
Expand All @@ -61,16 +64,26 @@ pub struct BridgeConn {
}

impl BridgeConn {
pub async fn try_connect(params: BridgeParameters) -> Result<Self, TransportError> {
pub async fn try_connect(
params: BridgeParameters,
token: CancellationToken,
) -> Result<Self, TransportError> {
let start = Instant::now();

match params {
BridgeParameters::QuicPlain(ref opts) => {
let opts = ClientOptions::try_from(opts)?;
let conn = transport_conn(&opts).await?;

let conn = token
.run_until_cancelled(transport_conn(&opts))
.await
.ok_or(TransportError::Cancelled)??;
let endpoint = conn.remote_address();
// .context("failed to connect to transport conn")?;
let (writer, reader) = conn.open_bi().await?;
let (writer, reader) = token
.run_until_cancelled(conn.open_bi())
.await
.ok_or(TransportError::Cancelled)??;
// .context("failed to connect to transport stream")?;
info!("quic transport connected in {:?}", start.elapsed());
Ok(Self {
Expand Down Expand Up @@ -194,8 +207,13 @@ pub async fn process_udp<R, W>(
token.clone(),
));

// Wait for both tasks to complete
let _ = tasks.join_all().await;
// Wait for both tasks to complete, if either one exits it _should_ cancel the other as well.
for res in tasks.join_all().await {
if let Err(e) = res {
tracing::error!("bridge udp forwarder error: {e}");
}
}
info!("transport udp forwarder shutdown");
}

// Assumes that the socket has already had `connect` called.
Expand Down Expand Up @@ -311,16 +329,7 @@ pub struct ClientOptions {
impl TryFrom<&QuicClientOptions> for ClientOptions {
type Error = TransportError;
fn try_from(value: &QuicClientOptions) -> Result<Self, Self::Error> {
let mut pubkey_bytes = [0u8; 32];
BASE64_STANDARD
.decode_slice(&value.id_pubkey, &mut pubkey_bytes)
.map_err(|e| {
TransportError::config_err(format!(
"failed to decode Quic bridge public key as base64: {e}"
))
})?;
let id_pubkey = VerifyingKey::from_bytes(&pubkey_bytes)
.map_err(|e| TransportError::config_err(format!("bad Quic bridge public key: {e}")))?;
let id_pubkey = Self::parse_base64_pubkey(&value.id_pubkey)?;

Ok(Self {
addresses: value.addresses.clone(),
Expand All @@ -331,6 +340,19 @@ impl TryFrom<&QuicClientOptions> for ClientOptions {
}

impl ClientOptions {
fn parse_base64_pubkey(key: impl AsRef<str>) -> Result<VerifyingKey, TransportError> {
let mut pubkey_bytes = [0u8; 32];
BASE64_STANDARD
.decode_slice(key.as_ref(), &mut pubkey_bytes)
.map_err(|e| {
TransportError::config_err(format!(
"failed to decode Quic bridge public key as base64: {e}"
))
})?;
VerifyingKey::from_bytes(&pubkey_bytes)
.map_err(|e| TransportError::config_err(format!("bad Quic bridge public key: {e}")))
}

fn get_ipv4(&self) -> Option<SocketAddr> {
self.addresses.iter().find(|s| s.is_ipv4()).cloned()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,11 +885,18 @@ impl TunnelMonitor {
// and the bind address of that UDP listener is provided to the entry wireguard tunnel
// as the endpoint address.
tracing::info!("Establishing DVPN QUIC transport tunnel");
let udp_fwd_cancel = self.shutdown_token.child_token();
let bridge_conn = transports::BridgeConn::try_connect(entry_bridge_params).await?;

let bridge_conn = transports::BridgeConn::try_connect(
entry_bridge_params,
self.shutdown_token.clone(),
)
.await
.inspect_err(|_| self.shutdown_token.cancel())?;
connection_data.entry_bridge_addr = Some(bridge_conn.endpoint);
let (local_fwd_listen_addr, fwd_handle) =
transports::UdpForwarder::launch(bridge_conn, None, udp_fwd_cancel.clone()).await?;
transports::UdpForwarder::launch(bridge_conn, None, self.shutdown_token.clone())
.await
.inspect_err(|_| self.shutdown_token.cancel())?;
transport_fwd_handle = Some(fwd_handle);
tracing::info!(
"quic transport connected, udp forwarder open on {local_fwd_listen_addr:?}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl TryFrom<proto::VpnApiError> for VpnApiError {
proto::vpn_api_error::ErrorDetail::Timeout(msg) => Self::Timeout(msg),
proto::vpn_api_error::ErrorDetail::StatusCode(e) => Self::StatusCode {
code: e.code.try_into().map_err(|e| {
ConversionError::Generic(format!("failed to convert status code: {}", e))
ConversionError::Generic(format!("failed to convert status code: {e}"))
})?,
message: e.message,
},
Expand Down
Loading