Skip to content

Commit 5d622b5

Browse files
committed
fix: add cache to server list queries
1 parent 77c252b commit 5d622b5

File tree

19 files changed

+200
-185
lines changed

19 files changed

+200
-185
lines changed

packages/common/service-discovery/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ edition.workspace = true
88
[dependencies]
99
rand = "0.8"
1010
reqwest = { version = "0.12", features = ["json"] }
11+
rivet-api.workspace = true
1112
serde = { version = "1.0", features = ["derive"] }
1213
tokio.workspace = true
1314
tracing = "0.1"

packages/common/service-discovery/src/lib.rs

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
use std::{future::Future, net::Ipv4Addr, sync::Arc, time::Duration};
1+
use std::{future::Future, sync::Arc, time::Duration};
22

33
use rand::Rng;
44
use reqwest::Client;
5-
use serde::Deserialize;
65
use tokio::{
76
sync::{Mutex, RwLock},
87
task::JoinHandle,
98
};
109
use url::Url;
10+
use rivet_api::models::{ProvisionServer, ProvisionDatacentersGetServersResponse};
1111

1212
pub struct ServiceDiscovery {
1313
fetch_endpoint: Url,
14-
last: RwLock<Vec<ApiServer>>,
14+
last: RwLock<Vec<ProvisionServer>>,
1515
handle: Mutex<Option<JoinHandle<()>>>,
1616
}
1717

@@ -27,7 +27,7 @@ impl ServiceDiscovery {
2727
/// Starts a background tokio task that periodically fetches the endpoint and calls `cb`.
2828
pub fn start<F, Fut, E>(self: &Arc<Self>, cb: F)
2929
where
30-
F: Fn(Vec<ApiServer>) -> Fut + Send + Sync + 'static,
30+
F: Fn(Vec<ProvisionServer>) -> Fut + Send + Sync + 'static,
3131
Fut: Future<Output = Result<(), E>> + Send + 'static,
3232
E: std::fmt::Debug,
3333
{
@@ -64,23 +64,23 @@ impl ServiceDiscovery {
6464
}
6565

6666
/// Returns the last retrieved value without fetching.
67-
pub async fn get(&self) -> Vec<ApiServer> {
67+
pub async fn get(&self) -> Vec<ProvisionServer> {
6868
self.last.read().await.clone()
6969
}
7070

7171
/// Manually fetches the endpoint.
72-
pub async fn fetch(&self) -> Result<Vec<ApiServer>, reqwest::Error> {
72+
pub async fn fetch(&self) -> Result<Vec<ProvisionServer>, reqwest::Error> {
7373
let client = Client::new();
7474
Ok(self.fetch_inner(&client).await?.servers)
7575
}
7676

77-
async fn fetch_inner(&self, client: &Client) -> Result<ApiResponse, reqwest::Error> {
77+
async fn fetch_inner(&self, client: &Client) -> Result<ProvisionDatacentersGetServersResponse, reqwest::Error> {
7878
Ok(client
7979
.get(self.fetch_endpoint.clone())
8080
.send()
8181
.await?
8282
.error_for_status()?
83-
.json::<ApiResponse>()
83+
.json::<ProvisionDatacentersGetServersResponse>()
8484
.await?)
8585
}
8686
}
@@ -93,13 +93,3 @@ impl Drop for ServiceDiscovery {
9393
}
9494
}
9595
}
96-
97-
#[derive(Deserialize)]
98-
pub struct ApiResponse {
99-
pub servers: Vec<ApiServer>,
100-
}
101-
102-
#[derive(Deserialize, Clone)]
103-
pub struct ApiServer {
104-
pub lan_ip: Option<Ipv4Addr>,
105-
}

packages/core/api/actor/src/route/builds.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -536,11 +536,10 @@ pub async fn complete_build(
536536
}
537537
}
538538

539-
// TODO: Disabled until deploy
540-
// // Error only if all prewarm requests failed
541-
// if !results.is_empty() && results.iter().all(|res| res.is_err()) {
542-
// return Err(unwrap!(unwrap!(results.into_iter().next()).err()));
543-
// }
539+
// Error only if all prewarm requests failed
540+
if !results.is_empty() && results.iter().all(|res| res.is_err()) {
541+
return Err(unwrap!(unwrap!(results.into_iter().next()).err()));
542+
}
544543
}
545544

546545
Ok(json!({}))

packages/core/api/provision/src/route/datacenters.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,27 +56,21 @@ pub async fn servers(
5656
_watch_index: WatchIndexQuery,
5757
query: ServerFilterQuery,
5858
) -> GlobalResult<models::ProvisionDatacentersGetServersResponse> {
59-
// Find server based on public ip
6059
let servers_res = ctx
61-
.op(cluster::ops::server::list::Input {
62-
filter: cluster::types::Filter {
63-
datacenter_ids: Some(vec![datacenter_id]),
64-
pool_types: (!query.pools.is_empty())
65-
.then(|| query.pools.into_iter().map(ApiInto::api_into).collect()),
66-
..Default::default()
67-
},
68-
include_destroyed: false,
69-
exclude_draining: true,
70-
exclude_no_vlan: true,
60+
.op(cluster::ops::datacenter::server_discovery::Input {
61+
datacenter_id,
62+
pool_types: query
63+
.pools
64+
.into_iter()
65+
.map(ApiInto::api_into)
66+
.collect(),
7167
})
7268
.await?;
7369

7470
Ok(models::ProvisionDatacentersGetServersResponse {
7571
servers: servers_res
7672
.servers
7773
.into_iter()
78-
// Filter out installing servers
79-
.filter(|server| server.install_complete_ts.is_some())
8074
.map(ApiInto::api_into)
8175
.collect(),
8276
})

packages/core/api/traefik-provider/src/route/tunnel.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,42 @@ pub async fn build_ip_allowlist(
4242
ctx: &Ctx<Auth>,
4343
config: &mut types::TraefikConfigResponse,
4444
) -> GlobalResult<()> {
45-
let servers_res = ctx
46-
.op(cluster::ops::server::list::Input {
47-
filter: cluster::types::Filter {
48-
pool_types: Some(vec![
49-
cluster::types::PoolType::Gg,
50-
cluster::types::PoolType::Guard,
51-
]),
52-
..Default::default()
53-
},
54-
include_destroyed: false,
55-
exclude_draining: false,
56-
exclude_no_vlan: false,
45+
let servers = ctx
46+
.cache()
47+
.ttl(5000)
48+
.fetch_one_json("cluster.guard_ip_allow_list", "", {
49+
let ctx = (*ctx).clone();
50+
move |mut cache, key| {
51+
let ctx = ctx.clone();
52+
async move {
53+
let servers_res = ctx
54+
.op(cluster::ops::server::list::Input {
55+
filter: cluster::types::Filter {
56+
pool_types: Some(vec![
57+
cluster::types::PoolType::Gg,
58+
cluster::types::PoolType::Guard,
59+
]),
60+
..Default::default()
61+
},
62+
include_destroyed: false,
63+
// IMPORTANT: Returns installing servers
64+
exclude_installing: false,
65+
exclude_draining: true,
66+
exclude_no_vlan: true,
67+
})
68+
.await?;
69+
70+
cache.resolve(&key, servers_res.servers);
71+
72+
Ok(cache)
73+
}
74+
}
5775
})
5876
.await?;
5977

60-
let public_ips = servers_res
61-
.servers
78+
let public_ips = servers
6279
.iter()
80+
.flatten()
6381
.filter_map(|server| server.wan_ip)
6482
.map(|ip| ip.to_string())
6583
.collect::<Vec<_>>();

packages/core/services/build/src/ops/resolve_for_tags.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub async fn build_resolve_for_tags(ctx: &OperationCtx, input: &Input) -> Global
2929
.ttl(util::duration::seconds(15))
3030
.fetch_one_json(
3131
"build",
32-
format!("{}:{}", input.env_id, tags_str.as_str()),
32+
(input.env_id, tags_str.as_str()),
3333
{
3434
let ctx = ctx.clone();
3535
let tags_str = tags_str.clone();

packages/core/services/cluster/src/ops/datacenter/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pub mod server_discovery;
12
pub mod get;
23
pub mod list;
34
pub mod location_get;
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use std::{collections::HashMap, str::FromStr};
2+
3+
use chirp_workflow::prelude::*;
4+
5+
use crate::types::{Filter, PoolType, Server};
6+
7+
#[derive(Debug)]
8+
pub struct Input {
9+
pub datacenter_id: Uuid,
10+
pub pool_types: Vec<PoolType>,
11+
}
12+
13+
#[derive(Debug)]
14+
pub struct Output {
15+
pub servers: Vec<Server>,
16+
}
17+
18+
/// Wrapper around server::list with very short cache.
19+
#[operation]
20+
pub async fn cluster_datacenter_server_discovery(ctx: &OperationCtx, input: &Input) -> GlobalResult<Output> {
21+
let cache_keys = if input.pool_types.is_empty() {
22+
vec![(input.datacenter_id, "all".to_string())]
23+
} else {
24+
input
25+
.pool_types
26+
.iter()
27+
.map(|pool| (input.datacenter_id, pool.to_string()))
28+
.collect()
29+
};
30+
31+
let servers = ctx
32+
.cache()
33+
.ttl(5000)
34+
.fetch_all_json("cluster.datacenter.service_discovery", cache_keys, {
35+
let ctx = ctx.clone();
36+
move |mut cache, keys| {
37+
let ctx = ctx.clone();
38+
async move {
39+
let pools = keys
40+
.into_iter()
41+
.filter(|(_, pool)| pool != "all")
42+
.map(|(_, pool)| PoolType::from_str(&pool))
43+
.collect::<GlobalResult<Vec<_>>>()?;
44+
45+
let servers_res = ctx
46+
.op(crate::ops::server::list::Input {
47+
filter: Filter {
48+
datacenter_ids: Some(vec![input.datacenter_id]),
49+
pool_types: (!pools.is_empty()).then(|| pools),
50+
..Default::default()
51+
},
52+
include_destroyed: false,
53+
exclude_installing: true,
54+
exclude_draining: true,
55+
exclude_no_vlan: true,
56+
})
57+
.await?;
58+
59+
let mut servers_by_pool_type =
60+
HashMap::with_capacity(servers_res.servers.len());
61+
62+
for server in servers_res.servers {
63+
servers_by_pool_type
64+
.entry(server.pool_type)
65+
.or_insert_with(Vec::new)
66+
.push(server);
67+
}
68+
69+
for (pool_type, servers) in servers_by_pool_type {
70+
cache.resolve(&(input.datacenter_id, pool_type.to_string()), servers);
71+
}
72+
73+
Ok(cache)
74+
}
75+
}
76+
})
77+
.await?;
78+
79+
Ok(Output {
80+
servers: servers.into_iter().flatten().collect(),
81+
})
82+
}

packages/core/services/cluster/src/ops/server/destroy_with_filter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub async fn cluster_server_destroy_with_filter(
2121
.op(crate::ops::server::list::Input {
2222
filter: input.filter.clone(),
2323
include_destroyed: false,
24+
exclude_installing: false,
2425
exclude_draining: false,
2526
exclude_no_vlan: false,
2627
})

packages/core/services/cluster/src/ops/server/list.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::types::{Filter, Server};
99
pub struct Input {
1010
pub filter: Filter,
1111
pub include_destroyed: bool,
12+
pub exclude_installing: bool,
1213
pub exclude_draining: bool,
1314
pub exclude_no_vlan: bool,
1415
}
@@ -47,15 +48,17 @@ pub async fn cluster_server_list(ctx: &OperationCtx, input: &Input) -> GlobalRes
4748
ON s.datacenter_id = d.datacenter_id
4849
WHERE
4950
($1 OR s.cloud_destroy_ts IS NULL) AND
50-
(NOT $2 OR s.drain_ts IS NULL) AND
51-
(NOT $3 OR s.vlan_ip IS NOT NULL) AND
52-
($4 IS NULL OR s.server_id = ANY($4)) AND
53-
($5 IS NULL OR s.datacenter_id = ANY($5)) AND
54-
($6 IS NULL OR d.cluster_id = ANY($6)) AND
55-
($7 IS NULL OR s.pool_type = ANY($7)) AND
56-
($8 IS NULL OR s.public_ip = ANY($8))
51+
(NOT $2 OR s.install_complete_ts IS NOT NULL) AND
52+
(NOT $3 OR s.drain_ts IS NULL) AND
53+
(NOT $4 OR s.vlan_ip IS NOT NULL) AND
54+
($5 IS NULL OR s.server_id = ANY($5)) AND
55+
($6 IS NULL OR s.datacenter_id = ANY($6)) AND
56+
($7 IS NULL OR d.cluster_id = ANY($7)) AND
57+
($8 IS NULL OR s.pool_type = ANY($8)) AND
58+
($9 IS NULL OR s.public_ip = ANY($9))
5759
",
5860
input.include_destroyed,
61+
input.exclude_installing,
5962
input.exclude_draining,
6063
input.exclude_no_vlan,
6164
&input.filter.server_ids,

0 commit comments

Comments
 (0)