From bda2bf9903c8dcd8033dd3807c9c5e17c2c40697 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 7 Aug 2025 09:42:19 +0200 Subject: [PATCH 01/23] versioning for gateway service --- Cargo.lock | 75 ++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 ++ src/gateway.rs | 14 +++++++--- 3 files changed, 87 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index edc53f75..556dee92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -450,6 +450,7 @@ dependencies = [ "axum 0.8.4", "base64", "clap", + "defguard_version", "defguard_wireguard_rs", "env_logger", "gethostname", @@ -468,10 +469,25 @@ dependencies = [ "toml", "tonic", "tonic-build", + "tower 0.5.2", "vergen-git2", "x25519-dalek", ] +[[package]] +name = "defguard_version" +version = "0.0.0" +dependencies = [ + "http", + "os_info", + "semver", + "thiserror 2.0.12", + "tonic", + "tonic-middleware", + "tower 0.5.2", + "tracing", +] + [[package]] name = "defguard_wireguard_rs" version = "0.7.5" @@ -646,6 +662,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -665,9 +692,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1343,6 +1372,18 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "os_info" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0e1ac5fde8d43c34139135df8ea9ee9465394b2d8d20f032d38998f64afffc3" +dependencies = [ + "log", + "plist", + "serde", + "windows-sys 0.52.0", +] + [[package]] name = "paste" version = "1.0.15" @@ -1403,6 +1444,19 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "plist" +version = "1.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3af6b589e163c5a788fab00ce0c0366f6efbb9959c2f9874b224936af7fce7e1" +dependencies = [ + "base64", + "indexmap 2.10.0", + "quick-xml", + "serde", + "time", +] + [[package]] name = "portable-atomic" version = "1.11.1" @@ -1513,6 +1567,15 @@ dependencies = [ "prost", ] +[[package]] +name = "quick-xml" +version = "0.38.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9845d9dccf565065824e69f9f235fafba1587031eda353c1f1561cd6a6be78f4" +dependencies = [ + "memchr", +] + [[package]] name = "quote" version = "1.0.40" @@ -2133,6 +2196,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tonic-middleware" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd11ca7918ee9f94e217285ace20caf6187476f399244ba8438cdc92ce665236" +dependencies = [ + "async-trait", + "futures-util", + "tonic", + "tower 0.4.13", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/Cargo.toml b/Cargo.toml index c39680fc..79d9e5c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "1.5.0" edition = "2021" [dependencies] +defguard_version = { path = "../defguard/crates/defguard_version" } axum = { version = "0.8", features = ["macros"] } base64 = "0.22" clap = { version = "4.5", features = ["derive", "env"] } @@ -26,6 +27,7 @@ tonic = { version = "0.12", default-features = false, features = [ "prost", "tls-native-roots", ] } +tower = "0.5.2" [target.'cfg(target_os = "linux")'.dependencies] nftnl = { git = "https://github.com/DefGuard/nftnl-rs.git", rev = "1a1147271f43b9d7182a114bb056a5224c35d38f" } diff --git a/src/gateway.rs b/src/gateway.rs index ce664717..b3e5f358 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -9,6 +9,7 @@ use std::{ time::{Duration, SystemTime}, }; +use defguard_version::client::{version_layer, DefguardVersionClientService}; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; use tokio::{ @@ -19,12 +20,14 @@ use tokio::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{ + body::BoxBody, codegen::InterceptedService, metadata::{Ascii, MetadataValue}, service::Interceptor, transport::{Certificate, Channel, ClientTlsConfig, Endpoint}, Request, Status, Streaming, }; +use tower::{ServiceExt, Layer}; use crate::{ config::Config, @@ -110,7 +113,7 @@ pub struct Gateway { #[cfg_attr(not(target_os = "linux"), allow(unused))] firewall_config: Option, pub connected: Arc, - client: GatewayServiceClient>, + client: GatewayServiceClient, AuthInterceptor>>, stats_thread: Option>, } @@ -248,7 +251,7 @@ impl Gateway { } async fn handle_stats_thread( - mut client: GatewayServiceClient>, + mut client: GatewayServiceClient, AuthInterceptor>>, rx: UnboundedReceiverStream, ) { let status = client.stats(rx).await; @@ -477,7 +480,7 @@ impl Gateway { fn setup_client( config: &Config, - ) -> Result>, GatewayError> + ) -> Result, AuthInterceptor>>, GatewayError> { debug!("Preparing gRPC client configuration"); let tls = ClientTlsConfig::new(); @@ -498,8 +501,11 @@ impl Gateway { .tls_config(tls)?; let channel = endpoint.connect_lazy(); + // Apply version layer to the channel + let versioned_service = version_layer(VERSION.to_string()).layer(channel); + let auth_interceptor = AuthInterceptor::new(&config.token)?; - let client = GatewayServiceClient::with_interceptor(channel, auth_interceptor); + let client = GatewayServiceClient::with_interceptor(versioned_service, auth_interceptor); debug!("gRPC client configuration done"); Ok(client) From d99b2a23eabf99422ec7f483a86ffe5f6dfadb44 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 7 Aug 2025 10:00:58 +0200 Subject: [PATCH 02/23] cargo fmt --- src/gateway.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index b3e5f358..70530d59 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -20,14 +20,13 @@ use tokio::{ }; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{ - body::BoxBody, codegen::InterceptedService, metadata::{Ascii, MetadataValue}, service::Interceptor, transport::{Certificate, Channel, ClientTlsConfig, Endpoint}, Request, Status, Streaming, }; -use tower::{ServiceExt, Layer}; +use tower::Layer; use crate::{ config::Config, @@ -113,7 +112,9 @@ pub struct Gateway { #[cfg_attr(not(target_os = "linux"), allow(unused))] firewall_config: Option, pub connected: Arc, - client: GatewayServiceClient, AuthInterceptor>>, + client: GatewayServiceClient< + InterceptedService, AuthInterceptor>, + >, stats_thread: Option>, } @@ -251,7 +252,9 @@ impl Gateway { } async fn handle_stats_thread( - mut client: GatewayServiceClient, AuthInterceptor>>, + mut client: GatewayServiceClient< + InterceptedService, AuthInterceptor>, + >, rx: UnboundedReceiverStream, ) { let status = client.stats(rx).await; @@ -480,8 +483,12 @@ impl Gateway { fn setup_client( config: &Config, - ) -> Result, AuthInterceptor>>, GatewayError> - { + ) -> Result< + GatewayServiceClient< + InterceptedService, AuthInterceptor>, + >, + GatewayError, + > { debug!("Preparing gRPC client configuration"); let tls = ClientTlsConfig::new(); // Use CA if provided, otherwise load certificates from system. @@ -503,7 +510,7 @@ impl Gateway { // Apply version layer to the channel let versioned_service = version_layer(VERSION.to_string()).layer(channel); - + let auth_interceptor = AuthInterceptor::new(&config.token)?; let client = GatewayServiceClient::with_interceptor(versioned_service, auth_interceptor); From 9ead0a2ee242977ba8cb8d99ffea7b82419db6c5 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 7 Aug 2025 12:25:27 +0200 Subject: [PATCH 03/23] use DefguardVersionClientLayer for version exchange with the server --- src/error.rs | 4 ++++ src/gateway.rs | 19 +++++++++++++++---- src/main.rs | 15 +++++++++++---- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/src/error.rs b/src/error.rs index d05e5d45..5a81aba3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,3 +1,4 @@ +use defguard_version::DefguardVersionError; use defguard_wireguard_rs::error::WireguardInterfaceError; use thiserror::Error; @@ -43,4 +44,7 @@ pub enum GatewayError { #[error("Firewall error: {0}")] FirewallError(#[from] FirewallError), + + #[error(transparent)] + DefguardVersionError(#[from] DefguardVersionError), } diff --git a/src/gateway.rs b/src/gateway.rs index 70530d59..39b4279d 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -4,12 +4,17 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, + Arc, Mutex, RwLock, }, time::{Duration, SystemTime}, }; -use defguard_version::client::{version_layer, DefguardVersionClientService}; +use defguard_version::{ + client::{ + DefguardVersionClientLayer, DefguardVersionClientService, + }, + DefguardVersionSet, +}; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; use tokio::{ @@ -123,8 +128,9 @@ impl Gateway { config: Config, wgapi: impl WireguardInterfaceApi + Send + Sync + 'static, firewall_api: FirewallApi, + version_set: Arc>, ) -> Result { - let client = Self::setup_client(&config)?; + let client = Self::setup_client(&config, version_set)?; Ok(Self { config, interface_configuration: None, @@ -483,6 +489,7 @@ impl Gateway { fn setup_client( config: &Config, + version_set: Arc>, ) -> Result< GatewayServiceClient< InterceptedService, AuthInterceptor>, @@ -509,7 +516,11 @@ impl Gateway { let channel = endpoint.connect_lazy(); // Apply version layer to the channel - let versioned_service = version_layer(VERSION.to_string()).layer(channel); + let versioned_service = DefguardVersionClientLayer::new( + version_set.read().unwrap().own.clone(), + Arc::clone(&version_set.read().unwrap().core), + ) + .layer(channel); let auth_interceptor = AuthInterceptor::new(&config.token)?; let client = GatewayServiceClient::with_interceptor(versioned_service, auth_interceptor); diff --git a/src/main.rs b/src/main.rs index ce5b2317..f0a70f3a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,15 @@ -use std::{fs::File, io::Write, process, sync::Arc}; +use std::{ + fs::File, + io::Write, + process, + sync::{Arc, RwLock}, +}; use defguard_gateway::{ config::get_config, enterprise::firewall::api::FirewallApi, error::GatewayError, - execute_command, gateway::Gateway, init_syslog, server::run_server, + execute_command, gateway::Gateway, init_syslog, server::run_server, VERSION, }; +use defguard_version::DefguardVersionSet; #[cfg(not(any(target_os = "macos", target_os = "netbsd")))] use defguard_wireguard_rs::Kernel; use defguard_wireguard_rs::{Userspace, WGApi}; @@ -14,6 +20,7 @@ use tokio::task::JoinSet; async fn main() -> Result<(), GatewayError> { // parse config let config = get_config()?; + let version_set = Arc::new(RwLock::new(DefguardVersionSet::try_from(VERSION)?)); // setup pidfile let pid = process::id(); @@ -43,12 +50,12 @@ async fn main() -> Result<(), GatewayError> { let mut gateway = if config.userspace { let wgapi = WGApi::::new(ifname)?; - Gateway::new(config.clone(), wgapi, firewall_api)? + Gateway::new(config.clone(), wgapi, firewall_api, version_set)? } else { #[cfg(not(any(target_os = "macos", target_os = "netbsd")))] { let wgapi = WGApi::::new(ifname)?; - Gateway::new(config.clone(), wgapi, firewall_api)? + Gateway::new(config.clone(), wgapi, firewall_api, version_set)? } #[cfg(any(target_os = "macos", target_os = "netbsd"))] { From 119423911a16069dc107af22e1c1eddc6cbea772 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 7 Aug 2025 13:56:06 +0200 Subject: [PATCH 04/23] use defguard_version tracing --- Cargo.lock | 129 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/config.rs | 4 ++ src/main.rs | 23 ++++++--- 3 files changed, 147 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 556dee92..d06d3510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -486,6 +486,7 @@ dependencies = [ "tonic-middleware", "tower 0.5.2", "tracing", + "tracing-subscriber", ] [[package]] @@ -1094,6 +1095,12 @@ dependencies = [ "libc", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.174" @@ -1142,6 +1149,15 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1330,6 +1346,16 @@ dependencies = [ "memoffset", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -1384,6 +1410,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "paste" version = "1.0.15" @@ -1629,8 +1661,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -1641,9 +1682,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -1861,6 +1908,15 @@ dependencies = [ "serde", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2009,6 +2065,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "time" version = "0.3.41" @@ -2286,6 +2351,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -2329,6 +2424,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" @@ -2398,6 +2499,28 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.1.3" diff --git a/src/config.rs b/src/config.rs index 8235fe46..87d42a5a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -10,6 +10,9 @@ use crate::error::GatewayError; #[clap(about = "Defguard VPN gateway service")] #[command(version)] pub struct Config { + #[arg(long, short = 'l', env = "DEFGUARD_LOG_LEVEL", default_value = "info")] + pub log_level: String, + /// Token received from Defguard after completing the network wizard #[arg( long, @@ -113,6 +116,7 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { + log_level: "info".to_string(), token: "TOKEN".into(), name: None, grpc_url: "http://localhost:50051".into(), diff --git a/src/main.rs b/src/main.rs index f0a70f3a..b1f5fcc8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,18 +9,16 @@ use defguard_gateway::{ config::get_config, enterprise::firewall::api::FirewallApi, error::GatewayError, execute_command, gateway::Gateway, init_syslog, server::run_server, VERSION, }; -use defguard_version::DefguardVersionSet; +use defguard_version::{ComponentInfo, DefguardVersionSet}; #[cfg(not(any(target_os = "macos", target_os = "netbsd")))] use defguard_wireguard_rs::Kernel; use defguard_wireguard_rs::{Userspace, WGApi}; -use env_logger::{init_from_env, Env, DEFAULT_FILTER_ENV}; use tokio::task::JoinSet; #[tokio::main] async fn main() -> Result<(), GatewayError> { // parse config let config = get_config()?; - let version_set = Arc::new(RwLock::new(DefguardVersionSet::try_from(VERSION)?)); // setup pidfile let pid = process::id(); @@ -31,14 +29,27 @@ async fn main() -> Result<(), GatewayError> { } // setup logging - if config.use_syslog { + let version_set = if config.use_syslog { if let Err(error) = init_syslog(&config, pid) { log::error!("Unable to initialize syslog. Is the syslog daemon running?"); return Err(error); } + Arc::new(RwLock::new(DefguardVersionSet { + own: ComponentInfo::try_from(VERSION)?, + core: Arc::new(RwLock::new(None)), + proxy: Arc::new(RwLock::new(None)), + gateway: Arc::new(RwLock::new(None)), + })) } else { - init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info")); - } + defguard_version::tracing::init( + VERSION, + &config.log_level.to_string(), + &[ + "send_grpc_message", + "bidirectional_communication", + ], + ) + }; if let Some(pre_up) = &config.pre_up { log::info!("Executing specified PRE_UP command: {pre_up}"); From e6be48b845a34343cd9c792dacf27ed156d4cc6d Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Fri, 8 Aug 2025 10:11:54 +0200 Subject: [PATCH 05/23] DefguardVersionSet no longer behind RwLock --- src/gateway.rs | 18 +++++++----------- src/main.rs | 9 +++------ 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 39b4279d..fc1e6e83 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -4,15 +4,13 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, Mutex, RwLock, + Arc, Mutex, }, time::{Duration, SystemTime}, }; use defguard_version::{ - client::{ - DefguardVersionClientLayer, DefguardVersionClientService, - }, + client::{DefguardVersionClientLayer, DefguardVersionClientService}, DefguardVersionSet, }; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; @@ -128,7 +126,7 @@ impl Gateway { config: Config, wgapi: impl WireguardInterfaceApi + Send + Sync + 'static, firewall_api: FirewallApi, - version_set: Arc>, + version_set: Arc, ) -> Result { let client = Self::setup_client(&config, version_set)?; Ok(Self { @@ -489,7 +487,7 @@ impl Gateway { fn setup_client( config: &Config, - version_set: Arc>, + version_set: Arc, ) -> Result< GatewayServiceClient< InterceptedService, AuthInterceptor>, @@ -516,11 +514,9 @@ impl Gateway { let channel = endpoint.connect_lazy(); // Apply version layer to the channel - let versioned_service = DefguardVersionClientLayer::new( - version_set.read().unwrap().own.clone(), - Arc::clone(&version_set.read().unwrap().core), - ) - .layer(channel); + let versioned_service = + DefguardVersionClientLayer::new(version_set.own.clone(), Arc::clone(&version_set.core)) + .layer(channel); let auth_interceptor = AuthInterceptor::new(&config.token)?; let client = GatewayServiceClient::with_interceptor(versioned_service, auth_interceptor); diff --git a/src/main.rs b/src/main.rs index b1f5fcc8..273ac6a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,20 +34,17 @@ async fn main() -> Result<(), GatewayError> { log::error!("Unable to initialize syslog. Is the syslog daemon running?"); return Err(error); } - Arc::new(RwLock::new(DefguardVersionSet { + Arc::new(DefguardVersionSet { own: ComponentInfo::try_from(VERSION)?, core: Arc::new(RwLock::new(None)), proxy: Arc::new(RwLock::new(None)), gateway: Arc::new(RwLock::new(None)), - })) + }) } else { defguard_version::tracing::init( VERSION, &config.log_level.to_string(), - &[ - "send_grpc_message", - "bidirectional_communication", - ], + &["send_grpc_message", "bidirectional_communication"], ) }; From d8f87721bae4e6c286ad0dc1ed0b16a6c754e7e3 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 11 Aug 2025 13:30:18 +0200 Subject: [PATCH 06/23] implement span-based approach to version logging --- Cargo.lock | 1 + Cargo.toml | 1 + src/gateway.rs | 28 +++++++++++++++++++++------- src/main.rs | 26 +++++--------------------- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d06d3510..9750fb45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -470,6 +470,7 @@ dependencies = [ "tonic", "tonic-build", "tower 0.5.2", + "tracing", "vergen-git2", "x25519-dalek", ] diff --git a/Cargo.toml b/Cargo.toml index 79d9e5c0..ff5236ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ tonic = { version = "0.12", default-features = false, features = [ "tls-native-roots", ] } tower = "0.5.2" +tracing = "0.1.41" [target.'cfg(target_os = "linux")'.dependencies] nftnl = { git = "https://github.com/DefGuard/nftnl-rs.git", rev = "1a1147271f43b9d7182a114bb056a5224c35d38f" } diff --git a/src/gateway.rs b/src/gateway.rs index fc1e6e83..5aca176f 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -11,7 +11,7 @@ use std::{ use defguard_version::{ client::{DefguardVersionClientLayer, DefguardVersionClientService}, - DefguardVersionSet, + parse_metadata, }; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; @@ -118,6 +118,7 @@ pub struct Gateway { client: GatewayServiceClient< InterceptedService, AuthInterceptor>, >, + core_version: (String, String), stats_thread: Option>, } @@ -126,9 +127,8 @@ impl Gateway { config: Config, wgapi: impl WireguardInterfaceApi + Send + Sync + 'static, firewall_api: FirewallApi, - version_set: Arc, ) -> Result { - let client = Self::setup_client(&config, version_set)?; + let client = Self::setup_client(&config)?; Ok(Self { config, interface_configuration: None, @@ -139,6 +139,7 @@ impl Gateway { stats_thread: None, firewall_api, firewall_config: None, + core_version: Default::default(), }) } @@ -461,6 +462,16 @@ impl Gateway { }; match (response, stream) { (Ok(response), Ok(stream)) => { + let (version, info) = parse_metadata(response.metadata()).unwrap(); + self.core_version = (version.to_string(), info.to_string()); + + let span = tracing::info_span!( + "core_connect", + gateway_version = %version, + gateway_info = %info, + ); + let _guard = span.enter(); + if let Err(err) = self.configure(response.into_inner()) { error!("Interface configuration failed: {err}"); continue; @@ -487,7 +498,6 @@ impl Gateway { fn setup_client( config: &Config, - version_set: Arc, ) -> Result< GatewayServiceClient< InterceptedService, AuthInterceptor>, @@ -514,9 +524,7 @@ impl Gateway { let channel = endpoint.connect_lazy(); // Apply version layer to the channel - let versioned_service = - DefguardVersionClientLayer::new(version_set.own.clone(), Arc::clone(&version_set.core)) - .layer(channel); + let versioned_service = DefguardVersionClientLayer::from_str(VERSION)?.layer(channel); let auth_interceptor = AuthInterceptor::new(&config.token)?; let client = GatewayServiceClient::with_interceptor(versioned_service, auth_interceptor); @@ -662,6 +670,12 @@ impl Gateway { debug!("Executing specified POST_UP command: {post_up}"); execute_command(post_up)?; } + let span = tracing::info_span!( + "core_grpc_loop", + core_version = %self.core_version.0, + core_info = %self.core_version.1, + ); + let _guard = span.enter(); let stats_stream = self.spawn_stats_thread(); let client = self.client.clone(); select! { diff --git a/src/main.rs b/src/main.rs index 273ac6a3..68655ead 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,9 @@ -use std::{ - fs::File, - io::Write, - process, - sync::{Arc, RwLock}, -}; +use std::{fs::File, io::Write, process, sync::Arc}; use defguard_gateway::{ config::get_config, enterprise::firewall::api::FirewallApi, error::GatewayError, execute_command, gateway::Gateway, init_syslog, server::run_server, VERSION, }; -use defguard_version::{ComponentInfo, DefguardVersionSet}; #[cfg(not(any(target_os = "macos", target_os = "netbsd")))] use defguard_wireguard_rs::Kernel; use defguard_wireguard_rs::{Userspace, WGApi}; @@ -29,23 +23,13 @@ async fn main() -> Result<(), GatewayError> { } // setup logging - let version_set = if config.use_syslog { + if config.use_syslog { if let Err(error) = init_syslog(&config, pid) { log::error!("Unable to initialize syslog. Is the syslog daemon running?"); return Err(error); } - Arc::new(DefguardVersionSet { - own: ComponentInfo::try_from(VERSION)?, - core: Arc::new(RwLock::new(None)), - proxy: Arc::new(RwLock::new(None)), - gateway: Arc::new(RwLock::new(None)), - }) } else { - defguard_version::tracing::init( - VERSION, - &config.log_level.to_string(), - &["send_grpc_message", "bidirectional_communication"], - ) + defguard_version::tracing::init(VERSION, &config.log_level.to_string()) }; if let Some(pre_up) = &config.pre_up { @@ -58,12 +42,12 @@ async fn main() -> Result<(), GatewayError> { let mut gateway = if config.userspace { let wgapi = WGApi::::new(ifname)?; - Gateway::new(config.clone(), wgapi, firewall_api, version_set)? + Gateway::new(config.clone(), wgapi, firewall_api)? } else { #[cfg(not(any(target_os = "macos", target_os = "netbsd")))] { let wgapi = WGApi::::new(ifname)?; - Gateway::new(config.clone(), wgapi, firewall_api, version_set)? + Gateway::new(config.clone(), wgapi, firewall_api)? } #[cfg(any(target_os = "macos", target_os = "netbsd"))] { From 5d40ad55f0b0b8c4ea09a3001a3a0ac5d8bf1aad Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Mon, 11 Aug 2025 14:07:46 +0200 Subject: [PATCH 07/23] fix DefguardVersionClientLayer constructor use --- src/gateway.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index 5aca176f..0e7dfcfa 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -524,7 +524,7 @@ impl Gateway { let channel = endpoint.connect_lazy(); // Apply version layer to the channel - let versioned_service = DefguardVersionClientLayer::from_str(VERSION)?.layer(channel); + let versioned_service = DefguardVersionClientLayer::new(VERSION)?.layer(channel); let auth_interceptor = AuthInterceptor::new(&config.token)?; let client = GatewayServiceClient::with_interceptor(versioned_service, auth_interceptor); From b961a88425be6bd7975922d127f708498dc2db89 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Tue, 12 Aug 2025 13:26:58 +0200 Subject: [PATCH 08/23] fix span variable names --- src/gateway.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 0e7dfcfa..6ae3be95 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -464,11 +464,10 @@ impl Gateway { (Ok(response), Ok(stream)) => { let (version, info) = parse_metadata(response.metadata()).unwrap(); self.core_version = (version.to_string(), info.to_string()); - let span = tracing::info_span!( "core_connect", - gateway_version = %version, - gateway_info = %info, + core_version = %version, + core_info = %info, ); let _guard = span.enter(); From 51e2863f6a74cd9b7aa31e7b6fa0a7e70e308cc5 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Wed, 13 Aug 2025 09:03:09 +0200 Subject: [PATCH 09/23] simplify version header injection --- Cargo.lock | 1 - Cargo.toml | 1 - src/gateway.rs | 52 +++++++++++++++++++++++++------------------------- 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9750fb45..2c889780 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -469,7 +469,6 @@ dependencies = [ "toml", "tonic", "tonic-build", - "tower 0.5.2", "tracing", "vergen-git2", "x25519-dalek", diff --git a/Cargo.toml b/Cargo.toml index ff5236ab..aa9d3479 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ tonic = { version = "0.12", default-features = false, features = [ "prost", "tls-native-roots", ] } -tower = "0.5.2" tracing = "0.1.41" [target.'cfg(target_os = "linux")'.dependencies] diff --git a/src/gateway.rs b/src/gateway.rs index 6ae3be95..b4431edb 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -9,10 +9,7 @@ use std::{ time::{Duration, SystemTime}, }; -use defguard_version::{ - client::{DefguardVersionClientLayer, DefguardVersionClientService}, - parse_metadata, -}; +use defguard_version::{parse_metadata, SystemInfo, SYSTEM_INFO_HEADER, VERSION_HEADER}; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; use tokio::{ @@ -29,7 +26,6 @@ use tonic::{ transport::{Certificate, Channel, ClientTlsConfig, Endpoint}, Request, Status, Streaming, }; -use tower::Layer; use crate::{ config::Config, @@ -74,13 +70,16 @@ impl From for InterfaceConfiguration { } } +/// Intercepts all grpc requests adding authentication and version metadata #[derive(Clone)] -struct AuthInterceptor { +struct RequestInterceptor { hostname: MetadataValue, token: MetadataValue, + version: MetadataValue, + system_info: MetadataValue, } -impl AuthInterceptor { +impl RequestInterceptor { fn new(token: &str) -> Result { let token = MetadataValue::try_from(token)?; let hostname = MetadataValue::try_from( @@ -88,14 +87,26 @@ impl AuthInterceptor { .to_str() .expect("Unable to get current hostname during gRPC connection setup."), )?; + let version = MetadataValue::try_from(VERSION)?; + let system_info = MetadataValue::try_from(SystemInfo::get().to_string())?; - Ok(Self { hostname, token }) + Ok(Self { + hostname, + token, + version, + system_info, + }) } } -impl Interceptor for AuthInterceptor { +impl Interceptor for RequestInterceptor { fn call(&mut self, mut request: Request<()>) -> Result, Status> { let metadata = request.metadata_mut(); + // Add version headers + metadata.insert(VERSION_HEADER, self.version.clone()); + metadata.insert(SYSTEM_INFO_HEADER, self.system_info.clone()); + + // Add auth headers metadata.insert("authorization", self.token.clone()); metadata.insert("hostname", self.hostname.clone()); @@ -115,9 +126,7 @@ pub struct Gateway { #[cfg_attr(not(target_os = "linux"), allow(unused))] firewall_config: Option, pub connected: Arc, - client: GatewayServiceClient< - InterceptedService, AuthInterceptor>, - >, + client: GatewayServiceClient>, core_version: (String, String), stats_thread: Option>, } @@ -257,9 +266,7 @@ impl Gateway { } async fn handle_stats_thread( - mut client: GatewayServiceClient< - InterceptedService, AuthInterceptor>, - >, + mut client: GatewayServiceClient>, rx: UnboundedReceiverStream, ) { let status = client.stats(rx).await; @@ -497,12 +504,8 @@ impl Gateway { fn setup_client( config: &Config, - ) -> Result< - GatewayServiceClient< - InterceptedService, AuthInterceptor>, - >, - GatewayError, - > { + ) -> Result>, GatewayError> + { debug!("Preparing gRPC client configuration"); let tls = ClientTlsConfig::new(); // Use CA if provided, otherwise load certificates from system. @@ -522,11 +525,8 @@ impl Gateway { .tls_config(tls)?; let channel = endpoint.connect_lazy(); - // Apply version layer to the channel - let versioned_service = DefguardVersionClientLayer::new(VERSION)?.layer(channel); - - let auth_interceptor = AuthInterceptor::new(&config.token)?; - let client = GatewayServiceClient::with_interceptor(versioned_service, auth_interceptor); + let request_interceptor = RequestInterceptor::new(&config.token)?; + let client = GatewayServiceClient::with_interceptor(channel, request_interceptor); debug!("gRPC client configuration done"); Ok(client) From a535438ed88cb699d4f492612113c105ac47b1f1 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 14 Aug 2025 08:20:28 +0200 Subject: [PATCH 10/23] version_info_from_metadata --- src/gateway.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index b4431edb..cdc6a85a 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -9,7 +9,9 @@ use std::{ time::{Duration, SystemTime}, }; -use defguard_version::{parse_metadata, SystemInfo, SYSTEM_INFO_HEADER, VERSION_HEADER}; +use defguard_version::{ + version_info_from_metadata, SystemInfo, SYSTEM_INFO_HEADER, VERSION_HEADER, +}; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; use tokio::{ @@ -469,12 +471,12 @@ impl Gateway { }; match (response, stream) { (Ok(response), Ok(stream)) => { - let (version, info) = parse_metadata(response.metadata()).unwrap(); + let (version, info) = version_info_from_metadata(response.metadata()); self.core_version = (version.to_string(), info.to_string()); let span = tracing::info_span!( "core_connect", - core_version = %version, - core_info = %info, + core_version = version, + core_info = info, ); let _guard = span.enter(); From fe2dd8d20871a6ea293acc9cb348847556b072f3 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Tue, 19 Aug 2025 09:29:05 +0200 Subject: [PATCH 11/23] use new version tracing initializator --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 68655ead..666dd2cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,7 +29,7 @@ async fn main() -> Result<(), GatewayError> { return Err(error); } } else { - defguard_version::tracing::init(VERSION, &config.log_level.to_string()) + defguard_version::tracing::init(VERSION, &config.log_level.to_string())? }; if let Some(pre_up) = &config.pre_up { From 0574ee5389b700682bab11163b735e40aebf7fce Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Tue, 19 Aug 2025 09:55:46 +0200 Subject: [PATCH 12/23] use defguard_version interceptor instead of manually setting the headers --- Cargo.toml | 2 +- src/gateway.rs | 36 ++++++++++++++++++++---------------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5a0f50e3..610bd138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ tonic = { version = "0.14", default-features = false, features = [ "tls-native-roots", "tls-ring", ] } -tracing = "0.1.41" +tracing = "0.1" tonic-prost = "0.14" [target.'cfg(target_os = "linux")'.dependencies] diff --git a/src/gateway.rs b/src/gateway.rs index a76210d4..65202a46 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -10,7 +10,7 @@ use std::{ }; use defguard_version::{ - version_info_from_metadata, SystemInfo, SYSTEM_INFO_HEADER, VERSION_HEADER, + client::version_interceptor, version_info_from_metadata, }; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; @@ -73,12 +73,20 @@ impl From for InterfaceConfiguration { } /// Intercepts all grpc requests adding authentication and version metadata -#[derive(Clone)] struct RequestInterceptor { hostname: MetadataValue, token: MetadataValue, - version: MetadataValue, - system_info: MetadataValue, + version_interceptor_fn: Box) -> Result, Status> + Send + Sync>, +} + +impl Clone for RequestInterceptor { + fn clone(&self) -> Self { + Self { + hostname: self.hostname.clone(), + token: self.token.clone(), + version_interceptor_fn: Box::new(version_interceptor(VERSION)), + } + } } impl RequestInterceptor { @@ -89,26 +97,22 @@ impl RequestInterceptor { .to_str() .expect("Unable to get current hostname during gRPC connection setup."), )?; - let version = MetadataValue::try_from(VERSION)?; - let system_info = MetadataValue::try_from(SystemInfo::get().to_string())?; Ok(Self { hostname, token, - version, - system_info, + version_interceptor_fn: Box::new(version_interceptor(VERSION)), }) } } impl Interceptor for RequestInterceptor { - fn call(&mut self, mut request: Request<()>) -> Result, Status> { - let metadata = request.metadata_mut(); - // Add version headers - metadata.insert(VERSION_HEADER, self.version.clone()); - metadata.insert(SYSTEM_INFO_HEADER, self.system_info.clone()); - + fn call(&mut self, request: Request<()>) -> Result, Status> { + // Apply version interceptor - adds version headers + let mut request = (self.version_interceptor_fn)(request)?; + // Add auth headers + let metadata = request.metadata_mut(); metadata.insert("authorization", self.token.clone()); metadata.insert("hostname", self.hostname.clone()); @@ -527,8 +531,8 @@ impl Gateway { .tls_config(tls)?; let channel = endpoint.connect_lazy(); - let request_interceptor = RequestInterceptor::new(&config.token)?; - let client = GatewayServiceClient::with_interceptor(channel, request_interceptor); + let auth_interceptor = RequestInterceptor::new(&config.token)?; + let client = GatewayServiceClient::with_interceptor(channel, auth_interceptor); debug!("gRPC client configuration done"); Ok(client) From 7e69b972c41c8cc0406d189c6ef0dcff00bb080b Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Wed, 20 Aug 2025 12:59:38 +0200 Subject: [PATCH 13/23] use the new tracing fields --- src/gateway.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 65202a46..a9f93590 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -9,9 +9,7 @@ use std::{ time::{Duration, SystemTime}, }; -use defguard_version::{ - client::version_interceptor, version_info_from_metadata, -}; +use defguard_version::{client::version_interceptor, version_info_from_metadata}; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; use tokio::{ @@ -110,7 +108,7 @@ impl Interceptor for RequestInterceptor { fn call(&mut self, request: Request<()>) -> Result, Status> { // Apply version interceptor - adds version headers let mut request = (self.version_interceptor_fn)(request)?; - + // Add auth headers let metadata = request.metadata_mut(); metadata.insert("authorization", self.token.clone()); @@ -477,11 +475,8 @@ impl Gateway { (Ok(response), Ok(stream)) => { let (version, info) = version_info_from_metadata(response.metadata()); self.core_version = (version.to_string(), info.to_string()); - let span = tracing::info_span!( - "core_connect", - core_version = version, - core_info = info, - ); + let span = + tracing::info_span!("core_connect", component = "core", version, info); let _guard = span.enter(); if let Err(err) = self.configure(response.into_inner()) { @@ -675,13 +670,13 @@ impl Gateway { debug!("Executing specified POST_UP command: {post_up}"); execute_command(post_up)?; } + let stats_stream = self.spawn_stats_thread(); let span = tracing::info_span!( "core_grpc_loop", - core_version = %self.core_version.0, - core_info = %self.core_version.1, + version = %self.core_version.0, + info = %self.core_version.1, ); let _guard = span.enter(); - let stats_stream = self.spawn_stats_thread(); let client = self.client.clone(); select! { biased; From 51faeea7045bf7de93d5ab66c3484b2780db66ac Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 21 Aug 2025 09:01:17 +0200 Subject: [PATCH 14/23] add tracing instrumentation to keep all communication in the versioning span --- src/gateway.rs | 111 ++++++++++++++++++++++++++----------------------- 1 file changed, 59 insertions(+), 52 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index a9f93590..1d74b323 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,3 +1,6 @@ +use defguard_version::{client::version_interceptor, version_info_from_metadata}; +use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; +use gethostname::gethostname; use std::{ collections::HashMap, fs::read_to_string, @@ -8,10 +11,6 @@ use std::{ }, time::{Duration, SystemTime}, }; - -use defguard_version::{client::version_interceptor, version_info_from_metadata}; -use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; -use gethostname::gethostname; use tokio::{ select, sync::mpsc, @@ -26,6 +25,7 @@ use tonic::{ transport::{Certificate, Channel, ClientTlsConfig, Endpoint}, Request, Status, Streaming, }; +use tracing::{instrument, Instrument}; use crate::{ config::Config, @@ -204,6 +204,7 @@ impl Gateway { } /// Starts tokio thread collecting stats and sending them to backend service via gRPC. + #[instrument(skip_all)] fn spawn_stats_thread(&mut self) -> UnboundedReceiverStream { if let Some(handle) = self.stats_thread.take() { debug!("Aborting previous stats thread before starting a new one"); @@ -214,61 +215,65 @@ impl Gateway { let wgapi = Arc::clone(&self.wgapi); let (tx, rx) = mpsc::unbounded_channel(); debug!("Spawning stats thread"); - let handle = spawn(async move { - // helper map to track if peer data is actually changing - // and avoid sending duplicate stats - let mut peer_map = HashMap::new(); - let mut interval = interval(period); - let mut id = 1; - 'outer: loop { - // wait until next iteration - interval.tick().await; - debug!("Sending active peer stats updates."); - let interface_data = wgapi.lock().unwrap().read_interface_data(); - match interface_data { - Ok(host) => { - let peers = host.peers; - debug!( - "Found {} peers configured on WireGuard interface", - peers.len() - ); - for peer in peers.into_values().filter(|p| { - p.last_handshake - .is_some_and(|lhs| lhs != SystemTime::UNIX_EPOCH) - }) { - let has_changed = peer_map - .get(&peer.public_key) - .is_none_or(|last_peer| *last_peer != peer); - if has_changed { - peer_map.insert(peer.public_key.clone(), peer.clone()); - id += 1; - if tx - .send(StatsUpdate { - id, - payload: Some(Payload::PeerStats((&peer).into())), - }) - .is_err() - { - debug!("Stats stream disappeared"); - break 'outer; + let handle = spawn( + async move { + // helper map to track if peer data is actually changing + // and avoid sending duplicate stats + let mut peer_map = HashMap::new(); + let mut interval = interval(period); + let mut id = 1; + 'outer: loop { + // wait until next iteration + interval.tick().await; + debug!("Sending active peer stats updates."); + let interface_data = wgapi.lock().unwrap().read_interface_data(); + match interface_data { + Ok(host) => { + let peers = host.peers; + debug!( + "Found {} peers configured on WireGuard interface", + peers.len() + ); + for peer in peers.into_values().filter(|p| { + p.last_handshake + .is_some_and(|lhs| lhs != SystemTime::UNIX_EPOCH) + }) { + let has_changed = peer_map + .get(&peer.public_key) + .is_none_or(|last_peer| *last_peer != peer); + if has_changed { + peer_map.insert(peer.public_key.clone(), peer.clone()); + id += 1; + if tx + .send(StatsUpdate { + id, + payload: Some(Payload::PeerStats((&peer).into())), + }) + .is_err() + { + debug!("Stats stream disappeared"); + break 'outer; + } + } else { + debug!( + "Stats for peer {} have not changed. Skipping.", + peer.public_key + ); } - } else { - debug!( - "Stats for peer {} have not changed. Skipping.", - peer.public_key - ); } } + Err(err) => error!("Failed to retrieve WireGuard interface stats: {err}"), } - Err(err) => error!("Failed to retrieve WireGuard interface stats: {err}"), + debug!("Sent peer stats updates for all peers."); } - debug!("Sent peer stats updates for all peers."); } - }); + .instrument(tracing::Span::current()), + ); self.stats_thread = Some(handle); UnboundedReceiverStream::new(rx) } + #[instrument(skip_all)] async fn handle_stats_thread( mut client: GatewayServiceClient>, rx: UnboundedReceiverStream, @@ -533,6 +538,7 @@ impl Gateway { Ok(client) } + #[instrument(skip_all)] async fn handle_updates(&mut self, updates_stream: &mut Streaming) { loop { match updates_stream.message().await { @@ -670,13 +676,14 @@ impl Gateway { debug!("Executing specified POST_UP command: {post_up}"); execute_command(post_up)?; } - let stats_stream = self.spawn_stats_thread(); let span = tracing::info_span!( "core_grpc_loop", - version = %self.core_version.0, - info = %self.core_version.1, + component = "core", + version = self.core_version.0, + info = self.core_version.1, ); let _guard = span.enter(); + let stats_stream = self.spawn_stats_thread(); let client = self.client.clone(); select! { biased; From 8b97a7450a48f47da8da354d009e0488d2945627 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 21 Aug 2025 09:57:32 +0200 Subject: [PATCH 15/23] use DefguardComponent enum in span definitions --- src/gateway.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 1d74b323..c62234ef 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,4 +1,6 @@ -use defguard_version::{client::version_interceptor, version_info_from_metadata}; +use defguard_version::{ + client::version_interceptor, version_info_from_metadata, DefguardComponent, +}; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; use std::{ @@ -480,8 +482,7 @@ impl Gateway { (Ok(response), Ok(stream)) => { let (version, info) = version_info_from_metadata(response.metadata()); self.core_version = (version.to_string(), info.to_string()); - let span = - tracing::info_span!("core_connect", component = "core", version, info); + let span = tracing::info_span!("core_connect", component = %DefguardComponent::Core, version, info); let _guard = span.enter(); if let Err(err) = self.configure(response.into_inner()) { @@ -678,7 +679,7 @@ impl Gateway { } let span = tracing::info_span!( "core_grpc_loop", - component = "core", + component = %DefguardComponent::Core, version = self.core_version.0, info = self.core_version.1, ); From 9947b275575c85ccdbf64fa9bafa749cf1bef07e Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 21 Aug 2025 10:21:54 +0200 Subject: [PATCH 16/23] store strongly-typed core ComponentInfo --- src/gateway.rs | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index c62234ef..e6e86840 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,5 +1,5 @@ use defguard_version::{ - client::version_interceptor, version_info_from_metadata, DefguardComponent, + client::version_interceptor, parse_metadata, ComponentInfo, DefguardComponent, }; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; @@ -133,7 +133,7 @@ pub struct Gateway { firewall_config: Option, pub connected: Arc, client: GatewayServiceClient>, - core_version: (String, String), + core_info: Option, stats_thread: Option>, } @@ -154,7 +154,7 @@ impl Gateway { stats_thread: None, firewall_api, firewall_config: None, - core_version: Default::default(), + core_info: None, }) } @@ -458,6 +458,21 @@ impl Gateway { Ok(()) } + fn get_tracing_variables(&self) -> (String, String) { + let version = self + .core_info + .as_ref() + .map(|info| info.version.to_string()) + .unwrap_or("?".to_string()); + let info = self + .core_info + .as_ref() + .map(|info| info.system.to_string()) + .unwrap_or("?".to_string()); + + (version, info) + } + /// Continuously tries to connect to gRPC endpoint. Once the connection is established /// configures the interface, starts the stats thread, connects and returns the updates stream. async fn connect(&mut self) -> Streaming { @@ -480,9 +495,14 @@ impl Gateway { }; match (response, stream) { (Ok(response), Ok(stream)) => { - let (version, info) = version_info_from_metadata(response.metadata()); - self.core_version = (version.to_string(), info.to_string()); - let span = tracing::info_span!("core_connect", component = %DefguardComponent::Core, version, info); + self.core_info = parse_metadata(response.metadata()); + let (version, info) = self.get_tracing_variables(); + let span = tracing::info_span!( + "core_connect", + component = %DefguardComponent::Core, + version, + info + ); let _guard = span.enter(); if let Err(err) = self.configure(response.into_inner()) { @@ -677,11 +697,12 @@ impl Gateway { debug!("Executing specified POST_UP command: {post_up}"); execute_command(post_up)?; } + let (version, info) = self.get_tracing_variables(); let span = tracing::info_span!( "core_grpc_loop", component = %DefguardComponent::Core, - version = self.core_version.0, - info = self.core_version.1, + version, + info, ); let _guard = span.enter(); let stats_stream = self.spawn_stats_thread(); From 912c57a4b03c5fac3d5243a310ed1ba506cf4721 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 21 Aug 2025 11:48:36 +0200 Subject: [PATCH 17/23] improve version typing --- Cargo.lock | 1 + Cargo.toml | 1 + src/error.rs | 3 +++ src/gateway.rs | 18 ++++++++++++------ src/main.rs | 4 +++- 5 files changed, 20 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f1851bb..46e0bf55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,6 +392,7 @@ dependencies = [ "nftnl", "nix", "prost", + "semver", "serde", "syslog", "thiserror 2.0.15", diff --git a/Cargo.toml b/Cargo.toml index 610bd138..2499a0f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ tonic = { version = "0.14", default-features = false, features = [ ] } tracing = "0.1" tonic-prost = "0.14" +semver = "1.0.26" [target.'cfg(target_os = "linux")'.dependencies] nftnl = { git = "https://github.com/DefGuard/nftnl-rs.git", rev = "1a1147271f43b9d7182a114bb056a5224c35d38f" } diff --git a/src/error.rs b/src/error.rs index 5a81aba3..d1405d0f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -47,4 +47,7 @@ pub enum GatewayError { #[error(transparent)] DefguardVersionError(#[from] DefguardVersionError), + + #[error(transparent)] + SemverError(#[from] semver::Error), } diff --git a/src/gateway.rs b/src/gateway.rs index e6e86840..477aebcf 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -1,5 +1,5 @@ use defguard_version::{ - client::version_interceptor, parse_metadata, ComponentInfo, DefguardComponent, + client::version_interceptor, parse_metadata, ComponentInfo, DefguardComponent, Version, }; use defguard_wireguard_rs::{net::IpAddrMask, WireguardInterfaceApi}; use gethostname::gethostname; @@ -76,6 +76,7 @@ impl From for InterfaceConfiguration { struct RequestInterceptor { hostname: MetadataValue, token: MetadataValue, + version: defguard_version::Version, version_interceptor_fn: Box) -> Result, Status> + Send + Sync>, } @@ -84,13 +85,14 @@ impl Clone for RequestInterceptor { Self { hostname: self.hostname.clone(), token: self.token.clone(), - version_interceptor_fn: Box::new(version_interceptor(VERSION)), + version: self.version.clone(), + version_interceptor_fn: Box::new(version_interceptor(self.version.clone())), } } } impl RequestInterceptor { - fn new(token: &str) -> Result { + fn new(token: &str, version: Version) -> Result { let token = MetadataValue::try_from(token)?; let hostname = MetadataValue::try_from( gethostname() @@ -101,7 +103,8 @@ impl RequestInterceptor { Ok(Self { hostname, token, - version_interceptor_fn: Box::new(version_interceptor(VERSION)), + version: version.clone(), + version_interceptor_fn: Box::new(version_interceptor(version)), }) } } @@ -551,8 +554,8 @@ impl Gateway { .keep_alive_while_idle(true) .tls_config(tls)?; let channel = endpoint.connect_lazy(); - - let auth_interceptor = RequestInterceptor::new(&config.token)?; + let version = Version::parse(VERSION)?; + let auth_interceptor = RequestInterceptor::new(&config.token, version)?; let client = GatewayServiceClient::with_interceptor(channel, auth_interceptor); debug!("gRPC client configuration done"); @@ -780,6 +783,7 @@ mod tests { stats_thread: None, firewall_api, firewall_config: None, + core_info: None, }; // new config is the same @@ -971,6 +975,7 @@ mod tests { stats_thread: None, firewall_api: FirewallApi::new("test_interface").unwrap(), firewall_config: None, + core_info: None, }; // Gateway has no firewall config, new rules are empty @@ -1043,6 +1048,7 @@ mod tests { stats_thread: None, firewall_api: FirewallApi::new("test_interface").unwrap(), firewall_config: None, + core_info: None, }; // Gateway has no config gateway.firewall_config = None; diff --git a/src/main.rs b/src/main.rs index 666dd2cd..def54a2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use defguard_gateway::{ config::get_config, enterprise::firewall::api::FirewallApi, error::GatewayError, execute_command, gateway::Gateway, init_syslog, server::run_server, VERSION, }; +use defguard_version::Version; #[cfg(not(any(target_os = "macos", target_os = "netbsd")))] use defguard_wireguard_rs::Kernel; use defguard_wireguard_rs::{Userspace, WGApi}; @@ -29,7 +30,8 @@ async fn main() -> Result<(), GatewayError> { return Err(error); } } else { - defguard_version::tracing::init(VERSION, &config.log_level.to_string())? + let version = Version::parse(VERSION)?; + defguard_version::tracing::init(version, &config.log_level.to_string())? }; if let Some(pre_up) = &config.pre_up { From 9f7b843d574bcb685bd8b5a17117c651d115ff51 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 21 Aug 2025 11:51:14 +0200 Subject: [PATCH 18/23] use re-exported semver::Error --- Cargo.lock | 1 - Cargo.toml | 1 - src/error.rs | 4 ++-- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46e0bf55..9f1851bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,7 +392,6 @@ dependencies = [ "nftnl", "nix", "prost", - "semver", "serde", "syslog", "thiserror 2.0.15", diff --git a/Cargo.toml b/Cargo.toml index 2499a0f4..610bd138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,6 @@ tonic = { version = "0.14", default-features = false, features = [ ] } tracing = "0.1" tonic-prost = "0.14" -semver = "1.0.26" [target.'cfg(target_os = "linux")'.dependencies] nftnl = { git = "https://github.com/DefGuard/nftnl-rs.git", rev = "1a1147271f43b9d7182a114bb056a5224c35d38f" } diff --git a/src/error.rs b/src/error.rs index d1405d0f..aceb3c4f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ -use defguard_version::DefguardVersionError; +use defguard_version::{DefguardVersionError, SemverError}; use defguard_wireguard_rs::error::WireguardInterfaceError; use thiserror::Error; @@ -49,5 +49,5 @@ pub enum GatewayError { DefguardVersionError(#[from] DefguardVersionError), #[error(transparent)] - SemverError(#[from] semver::Error), + SemverError(#[from] SemverError), } From c703c2c24e2b6128e2fa7ea8ffc2826e477effde Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Thu, 21 Aug 2025 12:50:59 +0200 Subject: [PATCH 19/23] review changes - rename spans - rename interceptor variable --- src/gateway.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 477aebcf..f442d24f 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -501,7 +501,7 @@ impl Gateway { self.core_info = parse_metadata(response.metadata()); let (version, info) = self.get_tracing_variables(); let span = tracing::info_span!( - "core_connect", + "core_configuration", component = %DefguardComponent::Core, version, info @@ -555,8 +555,8 @@ impl Gateway { .tls_config(tls)?; let channel = endpoint.connect_lazy(); let version = Version::parse(VERSION)?; - let auth_interceptor = RequestInterceptor::new(&config.token, version)?; - let client = GatewayServiceClient::with_interceptor(channel, auth_interceptor); + let request_interceptor = RequestInterceptor::new(&config.token, version)?; + let client = GatewayServiceClient::with_interceptor(channel, request_interceptor); debug!("gRPC client configuration done"); Ok(client) @@ -702,7 +702,7 @@ impl Gateway { } let (version, info) = self.get_tracing_variables(); let span = tracing::info_span!( - "core_grpc_loop", + "core_grpc", component = %DefguardComponent::Core, version, info, From 9a4f179663121c0192b8f422d9737ddfae09dd3f Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Fri, 22 Aug 2025 09:29:49 +0200 Subject: [PATCH 20/23] switch to git dependency for defguard_version --- Cargo.lock | 1 + Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 9f1851bb..a7c3ca72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -409,6 +409,7 @@ dependencies = [ [[package]] name = "defguard_version" version = "0.0.0" +source = "git+https://github.com/DefGuard/defguard.git?rev=f61ce40927a4d21095ea53a691219d5ae46e3e4e#f61ce40927a4d21095ea53a691219d5ae46e3e4e" dependencies = [ "http", "os_info", diff --git a/Cargo.toml b/Cargo.toml index 610bd138..96915c03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "1.5.0" edition = "2021" [dependencies] -defguard_version = { path = "../defguard/crates/defguard_version" } +defguard_version = { git = "https://github.com/DefGuard/defguard.git", rev = "f61ce40927a4d21095ea53a691219d5ae46e3e4e" } axum = { version = "0.8", features = ["macros"] } base64 = "0.22" clap = { version = "4.5", features = ["derive", "env"] } From 5ae37739a92a39387394dc1d9d24259c5b8deeb3 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Fri, 22 Aug 2025 09:34:59 +0200 Subject: [PATCH 21/23] fix long type for clippy --- src/gateway.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/gateway.rs b/src/gateway.rs index f442d24f..50e63472 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -72,12 +72,14 @@ impl From for InterfaceConfiguration { } } +type InterceptorFn = Box) -> Result, Status> + Send + Sync>; + /// Intercepts all grpc requests adding authentication and version metadata struct RequestInterceptor { hostname: MetadataValue, token: MetadataValue, version: defguard_version::Version, - version_interceptor_fn: Box) -> Result, Status> + Send + Sync>, + version_interceptor_fn: InterceptorFn, } impl Clone for RequestInterceptor { From d5a1608ca3517ddba891d30371cd0646720aea99 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Fri, 22 Aug 2025 09:40:20 +0200 Subject: [PATCH 22/23] add cargo-deny exception for defguard_version crate --- deny.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/deny.toml b/deny.toml index 2282829b..4c2d088a 100644 --- a/deny.toml +++ b/deny.toml @@ -106,7 +106,10 @@ allow = [ confidence-threshold = 0.8 # Allow 1 or more licenses on a per-crate basis, so that particular licenses # aren't accepted for every possible crate as with the normal allow list -exceptions = [{ allow = ["AGPL-3.0-only"], crate = "defguard-gateway" }] +exceptions = [ + { allow = ["AGPL-3.0-only"], crate = "defguard-gateway" }, + { allow = ["AGPL-3.0-only"], crate = "defguard_version" } +] # Some crates don't have (easily) machine readable licensing information, # adding a clarification entry for it allows you to manually specify the From 4257784588bb17f77601c5d4d6dd90499c3fca07 Mon Sep 17 00:00:00 2001 From: Jacek Chmielewski Date: Fri, 22 Aug 2025 09:56:01 +0200 Subject: [PATCH 23/23] use map_or instead of map into unwrap_or --- src/gateway.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/gateway.rs b/src/gateway.rs index 50e63472..e5155881 100644 --- a/src/gateway.rs +++ b/src/gateway.rs @@ -467,13 +467,11 @@ impl Gateway { let version = self .core_info .as_ref() - .map(|info| info.version.to_string()) - .unwrap_or("?".to_string()); + .map_or(String::from("?"), |info| info.version.to_string()); let info = self .core_info .as_ref() - .map(|info| info.system.to_string()) - .unwrap_or("?".to_string()); + .map_or(String::from("?"), |info| info.system.to_string()); (version, info) }