Skip to content

Commit 6db518e

Browse files
committed
fix(guard): add metrics
1 parent 39220d3 commit 6db518e

File tree

9 files changed

+218
-97
lines changed

9 files changed

+218
-97
lines changed

packages/common/logs/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ impl Logs {
2525
}
2626

2727
impl Logs {
28-
pub async fn start(self) -> Result<tokio::task::JoinHandle<()>> {
28+
pub async fn start(mut self) -> Result<tokio::task::JoinHandle<()>> {
2929
// Create logs dir if it does not exist
3030
fs::create_dir_all(&self.path).await?;
3131

32+
self.rotate().await?;
33+
3234
Ok(tokio::spawn(self.run()))
3335
}
3436

@@ -112,10 +114,12 @@ impl Logs {
112114
}
113115

114116
impl Logs {
115-
pub fn start_sync(self) -> Result<std::thread::JoinHandle<()>> {
117+
pub fn start_sync(mut self) -> Result<std::thread::JoinHandle<()>> {
116118
// Create logs dir if it does not exist
117119
std::fs::create_dir_all(&self.path)?;
118120

121+
self.rotate_sync()?;
122+
119123
Ok(std::thread::spawn(|| self.run_sync()))
120124
}
121125

packages/common/server-cli/src/commands/start.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ impl Opts {
5454
.and_then(|x| x.rivet.edge.as_ref())
5555
.and_then(|x| x.redirect_logs_dir.as_ref())
5656
{
57-
std::fs::create_dir_all(logs_dir)?;
5857
rivet_logs::Logs::new(logs_dir.clone(), LOGS_RETENTION)
5958
.start()
6059
.await?;

packages/core/services/cluster/src/workflows/server/install/install_scripts/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,14 @@ pub async fn gen_initialize(
227227
)?);
228228
script.push(components::rivet::guard::fetch_tls(server_token)?);
229229
script.push(components::rivet::guard::configure(config)?);
230+
231+
prometheus_targets.insert(
232+
"guard".into(),
233+
components::vector::PrometheusTarget {
234+
endpoint: "http://127.0.0.1:8091".into(),
235+
scrape_interval: 15,
236+
},
237+
);
230238
}
231239
}
232240

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,67 @@
11
use lazy_static::lazy_static;
2-
use rivet_metrics::prometheus::*;
2+
use rivet_metrics::{prometheus::*, REGISTRY};
33

44
lazy_static! {
5-
pub static ref ACTOR_REQUEST_TOTAL: IntCounterVec = register_int_counter_vec!(
6-
"actor_request_total",
5+
pub static ref ACTOR_REQUEST_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
6+
"guard_actor_request_total",
77
"Total number of requests to actor",
8-
&["actor_id", "server_id", "method", "path"]
8+
&["actor_id", "server_id", "method", "path"],
9+
*REGISTRY,
910
)
1011
.unwrap();
11-
pub static ref ACTOR_REQUEST_PENDING: IntGaugeVec = register_int_gauge_vec!(
12-
"actor_request_pending",
12+
pub static ref ACTOR_REQUEST_PENDING: IntGaugeVec = register_int_gauge_vec_with_registry!(
13+
"guard_actor_request_pending",
1314
"Number of pending requests to actor",
14-
&["actor_id", "server_id", "method", "path"]
15+
&["actor_id", "server_id", "method", "path"],
16+
*REGISTRY,
1517
)
1618
.unwrap();
17-
pub static ref ACTOR_REQUEST_DURATION: HistogramVec = register_histogram_vec!(
18-
"actor_request_duration_seconds",
19+
pub static ref ACTOR_REQUEST_DURATION: HistogramVec = register_histogram_vec_with_registry!(
20+
"guard_actor_request_duration_seconds",
1921
"Request duration in seconds",
20-
&["actor_id", "server_id", "status"]
22+
&["actor_id", "server_id", "status"],
23+
*REGISTRY,
2124
)
2225
.unwrap();
23-
pub static ref ACTOR_REQUEST_ERRORS: IntCounterVec = register_int_counter_vec!(
24-
"actor_request_errors_total",
26+
pub static ref ACTOR_REQUEST_ERRORS: IntCounterVec = register_int_counter_vec_with_registry!(
27+
"guard_actor_request_errors_total",
2528
"Total number of errors when proxying requests to actor",
26-
&["actor_id", "server_id", "error_type"]
29+
&["actor_id", "server_id", "error_type"],
30+
*REGISTRY,
31+
)
32+
.unwrap();
33+
pub static ref ROUTE_CACHE_SIZE: IntGauge = register_int_gauge_with_registry!(
34+
"guard_route_cache_size",
35+
"Number of entries in the route cache",
36+
*REGISTRY,
37+
).unwrap();
38+
pub static ref RATE_LIMITERS_COUNT: IntGauge = register_int_gauge_with_registry!(
39+
"guard_rate_limiters_count",
40+
"Number of active rate limiters",
41+
*REGISTRY,
42+
).unwrap();
43+
pub static ref IN_FLIGHT_COUNTERS_COUNT: IntGauge = register_int_gauge_with_registry!(
44+
"guard_in_flight_counters_count",
45+
"Number of active in-flight counters",
46+
*REGISTRY,
47+
)
48+
.unwrap();
49+
pub static ref TCP_CONNECTIONS_TOTAL: IntGauge = register_int_gauge_with_registry!(
50+
"guard_tcp_connections_total",
51+
"Total number of open TCP connections",
52+
*REGISTRY,
53+
)
54+
.unwrap();
55+
pub static ref REQUEST_TOTAL: IntCounter = register_int_counter_with_registry!(
56+
"guard_request_total",
57+
"Total number of requests processed",
58+
*REGISTRY,
59+
)
60+
.unwrap();
61+
pub static ref REQUEST_PENDING: IntGauge = register_int_gauge_with_registry!(
62+
"guard_request_pending",
63+
"Total number of active requests being processed",
64+
*REGISTRY,
2765
)
2866
.unwrap();
2967
}

packages/edge/infra/guard/core/src/proxy_service.rs

Lines changed: 113 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,17 @@ impl RouteCache {
175175
#[tracing::instrument(skip_all)]
176176
async fn insert(&self, hostname: String, path: String, result: RouteConfig) {
177177
self.cache.upsert_async((hostname, path), result).await;
178+
179+
metrics::ROUTE_CACHE_SIZE.set(self.cache.len() as i64);
178180
}
179181

180182
#[tracing::instrument(skip_all)]
181183
async fn purge(&self, hostname: &str, path: &str) {
182184
self.cache
183185
.remove_async(&(hostname.to_owned(), path.to_owned()))
184186
.await;
187+
188+
metrics::ROUTE_CACHE_SIZE.set(self.cache.len() as i64);
185189
}
186190
}
187191

@@ -253,8 +257,8 @@ pub struct ProxyState {
253257
routing_fn: RoutingFn,
254258
middleware_fn: MiddlewareFn,
255259
route_cache: RouteCache,
256-
rate_limiters: SccHashMap<Uuid, RateLimiter>,
257-
in_flight_counters: SccHashMap<Uuid, InFlightCounter>,
260+
rate_limiters: SccHashMap<std::net::IpAddr, RateLimiter>,
261+
in_flight_counters: SccHashMap<std::net::IpAddr, InFlightCounter>,
258262
port_type: PortType,
259263
}
260264

@@ -447,79 +451,119 @@ impl ProxyState {
447451
}
448452

449453
#[tracing::instrument(skip_all)]
450-
async fn check_rate_limit(&self, actor_id: &Option<Uuid>) -> GlobalResult<bool> {
451-
match actor_id {
452-
Some(id) => {
453-
let middleware_config = self.get_middleware_config(id).await?;
454-
455-
let entry = self
456-
.rate_limiters
457-
.entry_async(*id)
458-
.instrument(tracing::info_span!("entry_async"))
459-
.await;
460-
if let scc::hash_map::Entry::Occupied(mut entry) = entry {
461-
// Key exists, get and mutate existing RateLimiter
462-
let write_guard = entry.get_mut();
463-
Ok(write_guard.try_acquire())
464-
} else {
465-
// Key doesn't exist, insert a new RateLimiter
466-
let mut limiter = RateLimiter::new(
467-
middleware_config.rate_limit.requests,
468-
middleware_config.rate_limit.period,
469-
);
470-
let result = limiter.try_acquire();
471-
entry.insert_entry(limiter);
472-
Ok(result)
473-
}
474-
}
454+
async fn check_rate_limit(
455+
&self,
456+
ip_addr: std::net::IpAddr,
457+
actor_id: &Option<Uuid>,
458+
) -> GlobalResult<bool> {
459+
// Use default middleware config, or actor-specific config if available
460+
let middleware_config = match actor_id {
461+
Some(id) => self.get_middleware_config(id).await?,
475462
None => {
476-
// No actor ID means no rate limiting
477-
Ok(true)
463+
// Default middleware config
464+
MiddlewareConfig {
465+
rate_limit: RateLimitConfig {
466+
requests: 100, // 100 requests
467+
period: 60, // per 60 seconds
468+
},
469+
max_in_flight: MaxInFlightConfig {
470+
amount: 20, // 20 concurrent requests
471+
},
472+
retry: RetryConfig {
473+
max_attempts: 3, // 3 retry attempts
474+
initial_interval: 100, // 100ms initial interval
475+
},
476+
timeout: TimeoutConfig {
477+
request_timeout: 30, // 30 seconds for requests
478+
},
479+
}
478480
}
481+
};
482+
483+
let entry = self
484+
.rate_limiters
485+
.entry_async(ip_addr)
486+
.instrument(tracing::info_span!("entry_async"))
487+
.await;
488+
if let scc::hash_map::Entry::Occupied(mut entry) = entry {
489+
// Key exists, get and mutate existing RateLimiter
490+
let write_guard = entry.get_mut();
491+
Ok(write_guard.try_acquire())
492+
} else {
493+
// Key doesn't exist, insert a new RateLimiter
494+
let mut limiter = RateLimiter::new(
495+
middleware_config.rate_limit.requests,
496+
middleware_config.rate_limit.period,
497+
);
498+
let result = limiter.try_acquire();
499+
entry.insert_entry(limiter);
500+
501+
metrics::RATE_LIMITERS_COUNT.set(self.rate_limiters.len() as i64);
502+
503+
Ok(result)
479504
}
480505
}
481506

482507
#[tracing::instrument(skip_all)]
483-
async fn acquire_in_flight(&self, actor_id: &Option<Uuid>) -> GlobalResult<bool> {
484-
match actor_id {
485-
Some(id) => {
486-
let middleware_config = self.get_middleware_config(id).await?;
487-
488-
let entry = self
489-
.in_flight_counters
490-
.entry_async(*id)
491-
.instrument(tracing::info_span!("entry_async"))
492-
.await;
493-
if let scc::hash_map::Entry::Occupied(mut entry) = entry {
494-
// Key exists, get and mutate existing InFlightCounter
495-
let write_guard = entry.get_mut();
496-
Ok(write_guard.try_acquire())
497-
} else {
498-
// Key doesn't exist, insert a new InFlightCounter
499-
let mut counter = InFlightCounter::new(middleware_config.max_in_flight.amount);
500-
let result = counter.try_acquire();
501-
entry.insert_entry(counter);
502-
Ok(result)
503-
}
504-
}
508+
async fn acquire_in_flight(
509+
&self,
510+
ip_addr: std::net::IpAddr,
511+
actor_id: &Option<Uuid>,
512+
) -> GlobalResult<bool> {
513+
// Use default middleware config, or actor-specific config if available
514+
let middleware_config = match actor_id {
515+
Some(id) => self.get_middleware_config(id).await?,
505516
None => {
506-
// No actor ID means no in-flight limiting
507-
Ok(true)
517+
// Default middleware config
518+
MiddlewareConfig {
519+
rate_limit: RateLimitConfig {
520+
requests: 100, // 100 requests
521+
period: 60, // per 60 seconds
522+
},
523+
max_in_flight: MaxInFlightConfig {
524+
amount: 20, // 20 concurrent requests
525+
},
526+
retry: RetryConfig {
527+
max_attempts: 3, // 3 retry attempts
528+
initial_interval: 100, // 100ms initial interval
529+
},
530+
timeout: TimeoutConfig {
531+
request_timeout: 30, // 30 seconds for requests
532+
},
533+
}
508534
}
535+
};
536+
537+
let entry = self
538+
.in_flight_counters
539+
.entry_async(ip_addr)
540+
.instrument(tracing::info_span!("entry_async"))
541+
.await;
542+
if let scc::hash_map::Entry::Occupied(mut entry) = entry {
543+
// Key exists, get and mutate existing InFlightCounter
544+
let write_guard = entry.get_mut();
545+
Ok(write_guard.try_acquire())
546+
} else {
547+
// Key doesn't exist, insert a new InFlightCounter
548+
let mut counter = InFlightCounter::new(middleware_config.max_in_flight.amount);
549+
let result = counter.try_acquire();
550+
entry.insert_entry(counter);
551+
552+
metrics::IN_FLIGHT_COUNTERS_COUNT.set(self.in_flight_counters.len() as i64);
553+
554+
Ok(result)
509555
}
510556
}
511557

512558
#[tracing::instrument(skip_all)]
513-
async fn release_in_flight(&self, actor_id: &Option<Uuid>) {
514-
if let Some(id) = actor_id {
515-
if let Some(mut counter) = self
516-
.in_flight_counters
517-
.get_async(id)
518-
.instrument(tracing::info_span!("get_async"))
519-
.await
520-
{
521-
counter.release();
522-
}
559+
async fn release_in_flight(&self, ip_addr: std::net::IpAddr) {
560+
if let Some(mut counter) = self
561+
.in_flight_counters
562+
.get_async(&ip_addr)
563+
.instrument(tracing::info_span!("get_async"))
564+
.await
565+
{
566+
counter.release();
523567
}
524568
}
525569
}
@@ -607,8 +651,11 @@ impl ProxyService {
607651
let actor_id_str = actor_id.map_or_else(|| "none".to_string(), |id| id.to_string());
608652
let server_id_str = server_id.map_or_else(|| "none".to_string(), |id| id.to_string());
609653

654+
// Extract IP address from remote_addr
655+
let client_ip = self.remote_addr.ip();
656+
610657
// Apply rate limiting
611-
if !self.state.check_rate_limit(&actor_id).await? {
658+
if !self.state.check_rate_limit(client_ip, &actor_id).await? {
612659
metrics::ACTOR_REQUEST_ERRORS
613660
.with_label_values(&[&actor_id_str, &server_id_str, "429"])
614661
.inc();
@@ -619,7 +666,7 @@ impl ProxyService {
619666
}
620667

621668
// Check in-flight limit
622-
if !self.state.acquire_in_flight(&actor_id).await? {
669+
if !self.state.acquire_in_flight(client_ip, &actor_id).await? {
623670
metrics::ACTOR_REQUEST_ERRORS
624671
.with_label_values(&[&actor_id_str, &server_id_str, "429"])
625672
.inc();
@@ -646,10 +693,9 @@ impl ProxyService {
646693

647694
// Prepare to release in-flight counter when done
648695
let state_clone = self.state.clone();
649-
let actor_id_clone = actor_id;
650696
crate::defer! {
651697
tokio::spawn(async move {
652-
state_clone.release_in_flight(&actor_id_clone).await;
698+
state_clone.release_in_flight(client_ip).await;
653699
}.instrument(tracing::info_span!("release_in_flight_task")));
654700
}
655701

@@ -1625,7 +1671,6 @@ impl ProxyService {
16251671
.observe(duration.as_secs_f64());
16261672

16271673
// Decrement pending metric at the end
1628-
info!("Decrementing pending metric");
16291674
metrics::ACTOR_REQUEST_PENDING
16301675
.with_label_values(&[&actor_id_str_clone, &server_id_str_clone, &method, &path])
16311676
.dec();

0 commit comments

Comments
 (0)