Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 39 additions & 37 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,28 +194,29 @@ pub trait ControlStateDelegate: ControlStateReadAccess + ControlStateWriteAccess
impl<T: ControlStateReadAccess + ControlStateWriteAccess + Send + Sync> ControlStateDelegate for T {}

/// Query API of the SpacetimeDB control plane.
#[async_trait]
pub trait ControlStateReadAccess {
// Nodes
fn get_node_id(&self) -> Option<u64>;
fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>>;
fn get_nodes(&self) -> anyhow::Result<Vec<Node>>;
async fn get_node_id(&self) -> Option<u64>;
async fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>>;
async fn get_nodes(&self) -> anyhow::Result<Vec<Node>>;

// Databases
fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>>;
fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>>;
fn get_databases(&self) -> anyhow::Result<Vec<Database>>;
async fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>>;
async fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>>;
async fn get_databases(&self) -> anyhow::Result<Vec<Database>>;

// Replicas
fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>>;
fn get_replicas(&self) -> anyhow::Result<Vec<Replica>>;
fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica>;
async fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>>;
async fn get_replicas(&self) -> anyhow::Result<Vec<Replica>>;
async fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica>;

// Energy
fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>>;
async fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>>;

// DNS
fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>>;
fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>>;
async fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>>;
async fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>>;
}

/// Write operations on the SpacetimeDB control plane.
Expand Down Expand Up @@ -270,53 +271,54 @@ pub trait ControlStateWriteAccess: Send + Sync {
) -> anyhow::Result<SetDomainsResult>;
}

impl<T: ControlStateReadAccess + ?Sized> ControlStateReadAccess for Arc<T> {
#[async_trait]
impl<T: ControlStateReadAccess + Send + Sync + Sync + ?Sized> ControlStateReadAccess for Arc<T> {
// Nodes
fn get_node_id(&self) -> Option<u64> {
(**self).get_node_id()
async fn get_node_id(&self) -> Option<u64> {
(**self).get_node_id().await
}
fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
(**self).get_node_by_id(node_id)
async fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
(**self).get_node_by_id(node_id).await
}
fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
(**self).get_nodes()
async fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
(**self).get_nodes().await
}

// Databases
fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
(**self).get_database_by_id(id)
async fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
(**self).get_database_by_id(id).await
}
fn get_database_by_identity(&self, identity: &Identity) -> anyhow::Result<Option<Database>> {
(**self).get_database_by_identity(identity)
async fn get_database_by_identity(&self, identity: &Identity) -> anyhow::Result<Option<Database>> {
(**self).get_database_by_identity(identity).await
}
fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
(**self).get_databases()
async fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
(**self).get_databases().await
}

// Replicas
fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
(**self).get_replica_by_id(id)
async fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
(**self).get_replica_by_id(id).await
}
fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
(**self).get_replicas()
async fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
(**self).get_replicas().await
}

// Energy
fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
(**self).get_energy_balance(identity)
async fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
(**self).get_energy_balance(identity).await
}

// DNS
fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
(**self).lookup_identity(domain)
async fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
(**self).lookup_identity(domain).await
}

fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
(**self).reverse_lookup(database_identity)
async fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
(**self).reverse_lookup(database_identity).await
}

fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
(**self).get_leader_replica_by_database(database_id)
async fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
(**self).get_leader_replica_by_database(database_id).await
}
}

Expand Down
11 changes: 9 additions & 2 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ where

let replica = worker_ctx
.get_leader_replica_by_database(database.id)
.await
.ok_or((StatusCode::NOT_FOUND, "Replica not scheduled to this node yet."))?;
let replica_id = replica.id;

Expand Down Expand Up @@ -378,6 +379,7 @@ pub(crate) async fn worker_ctx_find_database(
) -> axum::response::Result<Option<Database>> {
worker_ctx
.get_database_by_identity(database_identity)
.await
.map_err(log_and_500)
}

Expand Down Expand Up @@ -474,6 +476,7 @@ pub async fn get_names<S: ControlStateDelegate>(

let names = ctx
.reverse_lookup(&database_identity)
.await
.map_err(log_and_500)?
.into_iter()
.filter_map(|x| String::from(x).try_into().ok())
Expand Down Expand Up @@ -611,6 +614,7 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
let op = {
let exists = ctx
.get_database_by_identity(&database_identity)
.await
.map_err(log_and_500)?
.is_some();
if !exists {
Expand Down Expand Up @@ -841,7 +845,10 @@ pub async fn set_names<S: ControlStateDelegate>(

let database_identity = name_or_identity.resolve(&ctx).await?;

let database = ctx.get_database_by_identity(&database_identity).map_err(log_and_500)?;
let database = ctx
.get_database_by_identity(&database_identity)
.await
.map_err(log_and_500)?;
let Some(database) = database else {
return Ok((
StatusCode::NOT_FOUND,
Expand All @@ -859,7 +866,7 @@ pub async fn set_names<S: ControlStateDelegate>(
}

for name in &validated_names {
if ctx.lookup_identity(name.as_str()).unwrap().is_some() {
if ctx.lookup_identity(name.as_str()).await.unwrap().is_some() {
return Ok((
StatusCode::BAD_REQUEST,
axum::Json(name::SetDomainsResult::OtherError(format!(
Expand Down
10 changes: 8 additions & 2 deletions crates/client-api/src/routes/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub async fn get_energy_balance<S: ControlStateDelegate>(
Path(IdentityParams { identity }): Path<IdentityParams>,
) -> axum::response::Result<impl IntoResponse> {
let identity = Identity::from(identity);
get_budget_inner(ctx, &identity)
get_budget_inner(ctx, &identity).await
}

#[serde_with::serde_as]
Expand Down Expand Up @@ -57,15 +57,20 @@ pub async fn add_energy<S: ControlStateDelegate>(
// TODO: is this guaranteed to pull the updated balance?
let balance = ctx
.get_energy_balance(&auth.claims.identity)
.await
.map_err(log_and_500)?
.map_or(0, |quanta| quanta.get());

Ok(axum::Json(BalanceResponse { balance }))
}

fn get_budget_inner(ctx: impl ControlStateDelegate, identity: &Identity) -> axum::response::Result<impl IntoResponse> {
async fn get_budget_inner(
ctx: impl ControlStateDelegate,
identity: &Identity,
) -> axum::response::Result<impl IntoResponse> {
let balance = ctx
.get_energy_balance(identity)
.await
.map_err(log_and_500)?
.map_or(0, |quanta| quanta.get());

Expand Down Expand Up @@ -103,6 +108,7 @@ pub async fn set_energy_balance<S: ControlStateDelegate>(
.unwrap_or(0);
let current_balance = ctx
.get_energy_balance(&identity)
.await
.map_err(log_and_500)?
.map_or(0, |quanta| quanta.get());

Expand Down
3 changes: 3 additions & 0 deletions crates/client-api/src/routes/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub async fn health<S: ControlStateDelegate + NodeDelegate>(
) -> axum::response::Result<impl IntoResponse> {
let nodes: Vec<u64> = ctx
.get_nodes()
.await
.map_err(|_| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Expand All @@ -23,8 +24,10 @@ pub async fn health<S: ControlStateDelegate + NodeDelegate>(
let schedulable = !ctx
.get_node_by_id(
ctx.get_node_id()
.await
.ok_or((StatusCode::INTERNAL_SERVER_ERROR, "Can't get node id"))?,
)
.await
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Couldn't get node info"))?
.map(|n| n.unschedulable)
.unwrap_or(false);
Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/src/routes/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub async fn get_databases<S: ControlStateDelegate>(
) -> axum::response::Result<impl IntoResponse> {
let identity = identity.into();
// Linear scan for all databases that have this owner, and return their identities
let all_dbs = ctx.get_databases().map_err(|e| {
let all_dbs = ctx.get_databases().await.map_err(|e| {
log::error!("Failure when retrieving databases for search: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/src/routes/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub async fn get_sd_config<S: ControlStateReadAccess>(
State(ctx): State<S>,
) -> axum::response::Result<impl IntoResponse> {
// TODO(cloutiertyler): security
let nodes = ctx.get_nodes().map_err(log_and_500)?;
let nodes = ctx.get_nodes().await.map_err(log_and_500)?;

let mut targets = Vec::new();
let labels = HashMap::default();
Expand Down
1 change: 1 addition & 0 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ where

let database = ctx
.get_database_by_identity(&db_identity)
.await
.unwrap()
.ok_or(StatusCode::NOT_FOUND)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/client-api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl NameOrIdentity {
) -> anyhow::Result<Result<Identity, &DatabaseName>> {
Ok(match self {
Self::Identity(identity) => Ok(Identity::from(*identity)),
Self::Name(name) => ctx.lookup_identity(name.as_ref())?.ok_or(name),
Self::Name(name) => ctx.lookup_identity(name.as_ref()).await?.ok_or(name),
})
}

Expand Down
27 changes: 14 additions & 13 deletions crates/standalone/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,14 @@ impl NodeDelegate for StandaloneEnv {
}
}

#[async_trait]
impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv {
// Nodes
fn get_node_id(&self) -> Option<u64> {
async fn get_node_id(&self) -> Option<u64> {
Some(0)
}

fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
async fn get_node_by_id(&self, node_id: u64) -> anyhow::Result<Option<Node>> {
if node_id == 0 {
return Ok(Some(Node {
id: 0,
Expand All @@ -166,46 +167,46 @@ impl spacetimedb_client_api::ControlStateReadAccess for StandaloneEnv {
Ok(None)
}

fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
Ok(vec![self.get_node_by_id(0)?.unwrap()])
async fn get_nodes(&self) -> anyhow::Result<Vec<Node>> {
Ok(vec![self.get_node_by_id(0).await?.unwrap()])
}

// Databases
fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
async fn get_database_by_id(&self, id: u64) -> anyhow::Result<Option<Database>> {
Ok(self.control_db.get_database_by_id(id)?)
}

fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>> {
async fn get_database_by_identity(&self, database_identity: &Identity) -> anyhow::Result<Option<Database>> {
Ok(self.control_db.get_database_by_identity(database_identity)?)
}

fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
async fn get_databases(&self) -> anyhow::Result<Vec<Database>> {
Ok(self.control_db.get_databases()?)
}

// Replicas
fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
async fn get_replica_by_id(&self, id: u64) -> anyhow::Result<Option<Replica>> {
Ok(self.control_db.get_replica_by_id(id)?)
}

fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
async fn get_replicas(&self) -> anyhow::Result<Vec<Replica>> {
Ok(self.control_db.get_replicas()?)
}

fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
async fn get_leader_replica_by_database(&self, database_id: u64) -> Option<Replica> {
self.control_db.get_leader_replica_by_database(database_id)
}
// Energy
fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
async fn get_energy_balance(&self, identity: &Identity) -> anyhow::Result<Option<EnergyBalance>> {
Ok(self.control_db.get_energy_balance(identity)?)
}

// DNS
fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
async fn lookup_identity(&self, domain: &str) -> anyhow::Result<Option<Identity>> {
Ok(self.control_db.spacetime_dns(domain)?)
}

fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
async fn reverse_lookup(&self, database_identity: &Identity) -> anyhow::Result<Vec<DomainName>> {
Ok(self.control_db.spacetime_reverse_dns(database_identity)?)
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/testing/src/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ impl CompiledModule {
.await
.unwrap();

let database = env.get_database_by_identity(&db_identity).unwrap().unwrap();
let instance = env.get_leader_replica_by_database(database.id).unwrap();
let database = env.get_database_by_identity(&db_identity).await.unwrap().unwrap();
let instance = env.get_leader_replica_by_database(database.id).await.unwrap();

let client_id = ClientActorId {
identity,
Expand Down
Loading