Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/common/api-helper/build/src/macro_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ pub fn __deserialize_query<T: DeserializeOwned + Send>(route: &Url) -> GlobalRes
}

#[doc(hidden)]
#[tracing::instrument(skip_all, name="setup_ctx")]
#[tracing::instrument(skip_all, name = "setup_ctx")]
pub async fn __with_ctx<
A: auth::ApiAuth + Send,
DB: chirp_workflow::db::Database + Sync + 'static,
Expand Down
18 changes: 9 additions & 9 deletions packages/common/clickhouse-inserter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use thiserror::Error;

#[derive(Debug, Error)]
pub enum Error {
#[error("failed to send event to ClickHouse inserter")]
ChannelSendError,
#[error("failed to send event to ClickHouse inserter")]
ChannelSendError,

#[error("serialization error: {0}")]
SerializationError(#[source] serde_json::Error),
#[error("serialization error: {0}")]
SerializationError(#[source] serde_json::Error),

#[error("failed to build reqwest client: {0}")]
ReqwestBuildError(#[source] reqwest::Error),
#[error("failed to build reqwest client: {0}")]
ReqwestBuildError(#[source] reqwest::Error),

#[error("failed to spawn background task")]
TaskSpawnError,
}
#[error("failed to spawn background task")]
TaskSpawnError,
}
6 changes: 4 additions & 2 deletions packages/common/config/src/config/server/rivet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl Rivet {
}
}

impl Rivet {
impl Rivet {
pub fn default_cluster_id(&self) -> GlobalResult<Uuid> {
if let Some(default_cluster_id) = self.default_cluster_id {
ensure!(
Expand All @@ -190,7 +190,9 @@ impl Rivet {
// Return default development clusters
AccessKind::Development => Ok(default_dev_cluster::CLUSTER_ID),
// No cluster configured
AccessKind::Public | AccessKind::Private => bail!("`default_cluster_id` not configured"),
AccessKind::Public | AccessKind::Private => {
bail!("`default_cluster_id` not configured")
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/common/pools/src/db/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ pub async fn setup(config: Config) -> Result<HashMap<String, RedisPool>, Error>
tracing::debug!("redis connected");

Ok(redis)
}
}
6 changes: 3 additions & 3 deletions packages/common/pools/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub enum Error {
}

impl From<global_error::GlobalError> for Error {
fn from(err: global_error::GlobalError) -> Self {
Error::Global(err)
}
fn from(err: global_error::GlobalError) -> Self {
Error::Global(err)
}
}
5 changes: 3 additions & 2 deletions packages/common/pools/src/pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ impl Pools {
let clickhouse_inserter = if let Some(vector_http) =
config.server.as_ref().and_then(|x| x.vector_http.as_ref())
{
let inserter = clickhouse_inserter::create_inserter(&vector_http.host, vector_http.port)
.map_err(Error::BuildClickHouseInserter)?;
let inserter =
clickhouse_inserter::create_inserter(&vector_http.host, vector_http.port)
.map_err(Error::BuildClickHouseInserter)?;
Some(inserter)
} else {
None
Expand Down
6 changes: 3 additions & 3 deletions packages/common/pools/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub use redis;
pub use sqlx;

pub use crate::{
ClickHouseInserterHandle, ClickHousePool, CrdbPool, FdbPool, NatsPool, RedisPool, SqlitePool, __sql_query,
__sql_query_as, __sql_query_as_raw, sql_execute, sql_fetch, sql_fetch_all, sql_fetch_many,
sql_fetch_one, sql_fetch_optional,
ClickHouseInserterHandle, ClickHousePool, CrdbPool, FdbPool, NatsPool, RedisPool, SqlitePool,
__sql_query, __sql_query_as, __sql_query_as_raw, sql_execute, sql_fetch, sql_fetch_all,
sql_fetch_many, sql_fetch_one, sql_fetch_optional,
};
4 changes: 2 additions & 2 deletions packages/common/util/core/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub fn duration(ms: i64, relative: bool) -> String {
let hours = (ms % 86_400_000) / 3_600_000;
let minutes = (ms % 3_600_000) / 60_000;
let seconds = (ms % 60_000) / 1_000;

if days > 0 {
parts.push(format!("{days}d"));
}
Expand All @@ -181,5 +181,5 @@ pub fn duration(ms: i64, relative: bool) -> String {
parts.push("ago".to_string());
}

parts.join(" ")
parts.join(" ")
}
10 changes: 3 additions & 7 deletions packages/core/api/actor/src/route/builds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,9 @@ pub async fn complete_build(
..Default::default()
};

edge_intercom_pegboard_prewarm_image(
&config,
&build_id.to_string(),
json!({}),
)
.await
.map_err(Into::<GlobalError>::into)
edge_intercom_pegboard_prewarm_image(&config, &build_id.to_string(), json!({}))
.await
.map_err(Into::<GlobalError>::into)
}
})
.buffer_unordered(16)
Expand Down
6 changes: 1 addition & 5 deletions packages/core/api/provision/src/route/datacenters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ pub async fn servers(
let servers_res = ctx
.op(cluster::ops::datacenter::server_discovery::Input {
datacenter_id,
pool_types: query
.pools
.into_iter()
.map(ApiInto::api_into)
.collect(),
pool_types: query.pools.into_iter().map(ApiInto::api_into).collect(),
})
.await?;

Expand Down
12 changes: 9 additions & 3 deletions packages/core/api/status/src/route/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ pub async fn status(
_ => {
bail_with!(
INTERNAL_STATUS_CHECK_FAILED,
error = format!("unknown request error: {:?} {:?}", content.status, content.content)
error = format!(
"unknown request error: {:?} {:?}",
content.status, content.content
)
);
}
},
Expand Down Expand Up @@ -277,7 +280,7 @@ pub async fn status(
.instrument(tracing::info_span!("actor_destroy_request", base_path=%config.base_path))
.await
{
Ok(_res) => {},
Ok(_res) => {}
Err(rivet_api::apis::Error::ResponseError(content)) => match content.entity {
Some(Status400(body))
| Some(Status403(body))
Expand All @@ -293,7 +296,10 @@ pub async fn status(
_ => {
bail_with!(
INTERNAL_STATUS_CHECK_FAILED,
error = format!("unknown request error: {:?} {:?}", content.status, content.content)
error = format!(
"unknown request error: {:?} {:?}",
content.status, content.content
)
);
}
},
Expand Down
25 changes: 16 additions & 9 deletions packages/core/api/ui/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,23 @@ impl Router {

// Build proxy URL by joining request path to base URL
let mut proxy_url = config.server()?.rivet.ui.proxy_origin().clone();

// Remove leading slash from request path since join() expects relative paths
let request_path = request.uri().path().strip_prefix('/').unwrap_or(request.uri().path());

let request_path = request
.uri()
.path()
.strip_prefix('/')
.unwrap_or(request.uri().path());

// Join the request path to the base URL
proxy_url = match proxy_url.join(request_path) {
Ok(url) => url,
Err(e) => bail!("Failed to build proxy URL: {}", e),
};

// Set query string if present
proxy_url.set_query(request.uri().query());

let full_proxy_url = proxy_url.to_string();

tracing::debug!(
Expand All @@ -77,10 +81,13 @@ impl Router {

// Forward body for non-GET requests
if request.method() != Method::GET && request.method() != Method::HEAD {
let body_bytes = match hyper::body::to_bytes(std::mem::replace(request.body_mut(), Body::empty())).await {
Ok(bytes) => bytes,
Err(e) => bail!("Failed to read request body: {}", e),
};
let body_bytes =
match hyper::body::to_bytes(std::mem::replace(request.body_mut(), Body::empty()))
.await
{
Ok(bytes) => bytes,
Err(e) => bail!("Failed to read request body: {}", e),
};
req_builder = req_builder.body(body_bytes.to_vec());
}

Expand Down
22 changes: 9 additions & 13 deletions packages/core/services/build/src/ops/resolve_for_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,21 @@ pub async fn build_resolve_for_tags(ctx: &OperationCtx, input: &Input) -> Global
unwrap!(
ctx.cache()
.ttl(util::duration::seconds(15))
.fetch_one_json(
"build",
(input.env_id, tags_str.as_str()),
{
.fetch_one_json("build", (input.env_id, tags_str.as_str()), {
let ctx = ctx.clone();
let tags_str = tags_str.clone();
move |mut cache, key| {
let ctx = ctx.clone();
let tags_str = tags_str.clone();
move |mut cache, key| {
let ctx = ctx.clone();
let tags_str = tags_str.clone();
async move {
let builds = get_builds(&ctx, input.env_id, &tags_str).await?;
async move {
let builds = get_builds(&ctx, input.env_id, &tags_str).await?;

cache.resolve(&key, builds);
cache.resolve(&key, builds);

Ok(cache)
}
Ok(cache)
}
}
)
})
.await?
)
};
Expand Down
2 changes: 1 addition & 1 deletion packages/core/services/cluster/src/ops/datacenter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub mod server_discovery;
pub mod get;
pub mod list;
pub mod location_get;
pub mod resolve_for_name_id;
pub mod server_discovery;
pub mod server_spec_get;
pub mod tls_get;
pub mod topology_get;
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ pub struct Output {

/// Wrapper around server::list with very short cache.
#[operation]
pub async fn cluster_datacenter_server_discovery(ctx: &OperationCtx, input: &Input) -> GlobalResult<Output> {
pub async fn cluster_datacenter_server_discovery(
ctx: &OperationCtx,
input: &Input,
) -> GlobalResult<Output> {
let cache_keys = if input.pool_types.is_empty() {
vec![(input.datacenter_id, "all".to_string())]
} else {
Expand Down
11 changes: 5 additions & 6 deletions packages/core/services/dynamic-config/src/ops/get_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ pub async fn get_config(ctx: &OperationCtx, input: &Input) -> GlobalResult<Outpu

// Pick an instance ID to insert if none exists. If this is specified in the config. fall back to
// this.
let default_instance_id =
if let Some(instance_id) = ctx.config().server()?.rivet.instance_id {
instance_id
} else {
Uuid::new_v4()
};
let default_instance_id = if let Some(instance_id) = ctx.config().server()?.rivet.instance_id {
instance_id
} else {
Uuid::new_v4()
};

let instance_id = INSTANCE_ID_ONCE
.get_or_try_init(|| async {
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/api/actor/src/route/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::collections::HashMap;

use api_helper::{anchor::WatchIndexQuery, ctx::Ctx};
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use util::serde::AsHashableExt;
use rivet_api::models;
use rivet_convert::{ApiInto, ApiTryInto};
use rivet_operation::prelude::*;
use serde::Deserialize;
use serde_json::json;
use util::serde::AsHashableExt;

use crate::{
assert,
Expand Down
30 changes: 16 additions & 14 deletions packages/edge/api/intercom/src/route/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,20 @@ pub async fn toggle_drain_client(
ctx.auth().bypass()?;

if body.draining {
let res = ctx.signal(pegboard::workflows::client::Drain {
drain_timeout_ts: unwrap_with!(
body.drain_complete_ts,
API_BAD_BODY,
error = "missing `drain_complete_ts`"
)
.parse::<chrono::DateTime<chrono::Utc>>()?
.timestamp_millis(),
})
.to_workflow::<pegboard::workflows::client::Workflow>()
.tag("client_id", client_id)
.send()
.await;
let res = ctx
.signal(pegboard::workflows::client::Drain {
drain_timeout_ts: unwrap_with!(
body.drain_complete_ts,
API_BAD_BODY,
error = "missing `drain_complete_ts`"
)
.parse::<chrono::DateTime<chrono::Utc>>()?
.timestamp_millis(),
})
.to_workflow::<pegboard::workflows::client::Workflow>()
.tag("client_id", client_id)
.send()
.await;

if let Some(WorkflowError::WorkflowNotFound) = res.as_workflow_error() {
tracing::warn!(
Expand All @@ -151,7 +152,8 @@ pub async fn toggle_drain_client(
res?;
}
} else {
let res = ctx.signal(pegboard::workflows::client::Undrain {})
let res = ctx
.signal(pegboard::workflows::client::Undrain {})
.to_workflow::<pegboard::workflows::client::Workflow>()
.tag("client_id", client_id)
.send()
Expand Down
2 changes: 1 addition & 1 deletion packages/edge/infra/client/config/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct Runner {

pub container_runner_binary_path: Option<PathBuf>,
pub isolate_runner_binary_path: Option<PathBuf>,

/// Custom host entries to append to /etc/hosts in actor containers.
#[serde(default)]
pub custom_hosts: Option<Vec<HostEntry>>,
Expand Down
7 changes: 4 additions & 3 deletions packages/edge/infra/client/manager/src/actor/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,12 +776,13 @@ fn build_hosts_content(ctx: &Ctx) -> String {
127.0.0.1 localhost
::1 localhost ip6-localhost ip6-loopback
"
).to_string();

)
.to_string();

for host_entry in ctx.config().runner.custom_hosts() {
content.push_str(&format!("{}\t{}\n", host_entry.ip, host_entry.hostname));
}

content
}

Expand Down
Loading
Loading