From c9c29c3f528d1d4b336c28e360fa15f640451306 Mon Sep 17 00:00:00 2001 From: Fredrik Enestad Date: Thu, 23 Jan 2025 20:46:33 +0100 Subject: [PATCH] runtimes/core: support running multiple gateways --- runtimes/core/src/api/call.rs | 4 + runtimes/core/src/api/cors/mod.rs | 8 +- runtimes/core/src/api/endpoint.rs | 55 ++--- runtimes/core/src/api/gateway/mod.rs | 285 ++++++++++++++-------- runtimes/core/src/api/gateway/router.rs | 115 +++++---- runtimes/core/src/api/manager.rs | 168 ++++++++----- runtimes/core/src/api/reqauth/platform.rs | 28 +++ runtimes/core/src/names.rs | 2 +- runtimes/js/src/gateway.rs | 4 +- tsparser/src/legacymeta/mod.rs | 5 +- 10 files changed, 416 insertions(+), 258 deletions(-) diff --git a/runtimes/core/src/api/call.rs b/runtimes/core/src/api/call.rs index e5d0227b48..8793c87696 100644 --- a/runtimes/core/src/api/call.rs +++ b/runtimes/core/src/api/call.rs @@ -96,6 +96,10 @@ impl ServiceRegistry { }) } + pub fn service_names(&self) -> Vec<&EncoreName> { + self.base_urls.keys().collect() + } + pub fn service_base_url(&self, service_name: &Q) -> Option<&String> where EncoreName: Borrow, diff --git a/runtimes/core/src/api/cors/mod.rs b/runtimes/core/src/api/cors/mod.rs index 562a493815..98232f6f6c 100644 --- a/runtimes/core/src/api/cors/mod.rs +++ b/runtimes/core/src/api/cors/mod.rs @@ -197,12 +197,16 @@ pub struct MetaHeaders { } impl MetaHeaders { - pub fn from_schema(endpoints: &EndpointMap, auth: Option<&auth::Authenticator>) -> Self { + pub fn from_schema( + gateway_name: &str, + endpoints: &EndpointMap, + auth: Option<&auth::Authenticator>, + ) -> Self { let mut allow_headers = HashSet::new(); let mut expose_headers = HashSet::new(); for ep in endpoints.values() { - if !ep.exposed { + if !ep.exposed.contains(gateway_name) { continue; } for h in ep.request.iter().flat_map(|req| req.header.iter()) { diff --git a/runtimes/core/src/api/endpoint.rs b/runtimes/core/src/api/endpoint.rs index 60ca7e87e4..f3626846ae 100644 --- a/runtimes/core/src/api/endpoint.rs +++ b/runtimes/core/src/api/endpoint.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -24,11 +24,12 @@ use crate::encore::parser::meta::v1::{self as meta, selector}; use crate::log::LogFromRust; use crate::model::StreamDirection; use crate::names::EndpointName; -use crate::trace; use crate::{model, Hosted}; +use crate::{trace, EncoreName}; use super::pvalue::{PValue, PValues}; use super::reqauth::caller::Caller; +use super::reqauth::platform::ValidationData; #[derive(Debug)] pub struct SuccessResponse { @@ -162,8 +163,8 @@ pub struct Endpoint { /// Whether this is a raw endpoint. pub raw: bool, - /// Whether the service is exposed publicly. - pub exposed: bool, + /// Which gateways this endpoint is exposed through. + pub exposed: HashSet, /// Whether the service requires authentication data. pub requires_auth: bool, @@ -331,8 +332,8 @@ pub fn endpoints_from_meta( } let resp_schema = ep.response_schema.build(®istry)?; - // We only support a single gateway right now. - let exposed = ep.ep.expose.contains_key("api-gateway"); + let exposed = ep.ep.expose.keys().map(|gw_name| gw_name.into()).collect(); + let raw = rpc::Protocol::try_from(ep.ep.proto).is_ok_and(|proto| proto == rpc::Protocol::Raw); @@ -444,11 +445,7 @@ impl EndpointHandler { .into_parts(); // Authenticate the request from the platform, if applicable. - #[allow(clippy::manual_unwrap_or_default)] - let platform_seal_of_approval = match self.authenticate_platform(&parts) { - Ok(seal) => seal, - Err(_err) => None, - }; + let platform_seal_of_approval = self.authenticate_platform(&parts).ok(); let meta = CallMeta::parse_with_caller( &self.shared.inbound_svc_auth, @@ -546,8 +543,13 @@ impl EndpointHandler { let internal_caller = request.internal_caller.clone(); + // check if this endpoint is exposed by the calling gateway + let exposed = internal_caller.as_ref().is_some_and(|caller| { + matches!(caller, Caller::Gateway { gateway } if self.endpoint.exposed.contains(gateway)) + }); + // If the endpoint isn't exposed, return a 404. - if !self.endpoint.exposed && !request.allows_private_endpoint_call() { + if !exposed && !request.allows_private_endpoint_call() { return Error { code: ErrCode::NotFound, message: "endpoint not found".into(), @@ -662,32 +664,9 @@ impl EndpointHandler { fn authenticate_platform( &self, req: &axum::http::request::Parts, - ) -> Result, platform::ValidationError> { - let Some(x_encore_auth_header) = req.headers.get("x-encore-auth") else { - return Ok(None); - }; - let x_encore_auth_header = x_encore_auth_header - .to_str() - .map_err(|_| platform::ValidationError::InvalidMac)?; - - let Some(date_header) = req.headers.get("Date") else { - return Err(platform::ValidationError::InvalidDateHeader); - }; - let date_header = date_header - .to_str() - .map_err(|_| platform::ValidationError::InvalidDateHeader)?; - - let request_path = req.uri.path(); - let req = platform::ValidationData { - request_path, - date_header, - x_encore_auth_header, - }; - - self.shared - .platform_auth - .validate_platform_request(&req) - .map(Some) + ) -> Result { + let data = ValidationData::from_req(req)?; + self.shared.platform_auth.validate_platform_request(&data) } } diff --git a/runtimes/core/src/api/gateway/mod.rs b/runtimes/core/src/api/gateway/mod.rs index 16ed767656..8f03a38af7 100644 --- a/runtimes/core/src/api/gateway/mod.rs +++ b/runtimes/core/src/api/gateway/mod.rs @@ -2,10 +2,12 @@ mod router; mod websocket; use std::borrow::Cow; +use std::collections::BTreeMap; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use anyhow::bail; use anyhow::Context; use axum::async_trait; use bytes::{BufMut, Bytes, BytesMut}; @@ -17,6 +19,7 @@ use pingora::server::configuration::{Opt, ServerConf}; use pingora::services::Service; use pingora::upstreams::peer::HttpPeer; use pingora::{Error, ErrorSource, ErrorType, OkOrErr, OrErr}; +use router::Router; use router::Target; use tokio::sync::watch; use url::Url; @@ -25,90 +28,126 @@ use crate::api::auth; use crate::api::call::{CallDesc, ServiceRegistry}; use crate::api::paths::PathSet; use crate::api::reqauth::caller::Caller; +use crate::api::reqauth::platform; use crate::api::reqauth::{svcauth, CallMeta}; use crate::{api, model, EncoreName}; +use super::auth::InboundRequest; use super::cors::cors_headers_config::CorsHeadersConfig; use super::encore_routes::healthz; -#[derive(Clone)] -pub struct Gateway { - inner: Arc, -} - -struct Inner { - shared: Arc, - service_registry: Arc, - router: router::Router, - cors_config: CorsHeadersConfig, - healthz: healthz::Handler, - own_api_address: Option, - proxied_push_subs: HashMap, -} +const INTERNAL_ROUTE_HEADER: &str = "x-encore-internal-route"; pub struct GatewayCtx { upstream_service_name: EncoreName, - upstream_base_path: String, + upstream_path: String, upstream_host: Option, upstream_require_auth: bool, + gateway: Arc, } impl GatewayCtx { - fn prepend_base_path(&self, uri: &http::Uri) -> anyhow::Result { + fn upstream_uri(&self, req: &RequestHeader) -> anyhow::Result { let mut builder = http::Uri::builder(); - if let Some(scheme) = uri.scheme() { + if let Some(scheme) = req.uri.scheme() { builder = builder.scheme(scheme.clone()); } - if let Some(authority) = uri.authority() { + if let Some(authority) = req.uri.authority() { builder = builder.authority(authority.clone()); } - let base_path = self.upstream_base_path.trim_end_matches('/'); - builder = builder.path_and_query(format!( - "{}{}", - base_path, - uri.path_and_query().map_or("", |pq| pq.as_str()) - )); + if let Some(query) = req.uri.query() { + builder = builder.path_and_query(format!("{}?{}", self.upstream_path, query)); + } else { + builder = builder.path_and_query(&self.upstream_path); + }; builder.build().context("failed to build uri") } } +pub struct Gateway { + name: EncoreName, + auth_handler: Option, + router: router::Router, + internal_router: router::Router, + cors_config: CorsHeadersConfig, +} + impl Gateway { - #[allow(clippy::too_many_arguments)] pub fn new( name: EncoreName, service_registry: Arc, service_routes: PathSet>, auth_handler: Option, cors_config: CorsHeadersConfig, - healthz: healthz::Handler, - own_api_address: Option, - proxied_push_subs: HashMap, ) -> anyhow::Result { - let shared = Arc::new(SharedGatewayData { - name, - auth: auth_handler, - }); + let router = service_routes.try_into()?; - let mut router = router::Router::new(); - router.add_routes(&service_routes)?; + let services = service_registry.service_names(); + let internal_router = Router::new_internal(services)?; Ok(Gateway { - inner: Arc::new(Inner { - shared, - service_registry, - router, - cors_config, - healthz, - own_api_address, - proxied_push_subs, - }), + name, + auth_handler, + router, + internal_router, + cors_config, }) } pub fn auth_handler(&self) -> Option<&auth::Authenticator> { - self.inner.shared.auth.as_ref() + self.auth_handler.as_ref() + } +} + +#[derive(Clone)] +pub struct GatewayServer { + gateways: BTreeMap>, + service_registry: Arc, + healthz: healthz::Handler, + own_api_address: Option, + proxied_push_subs: HashMap, + platform_validator: Arc, +} + +impl GatewayServer { + pub fn new( + service_registry: Arc, + healthz: healthz::Handler, + own_api_address: Option, + proxied_push_subs: HashMap, + platform_validator: Arc, + ) -> Self { + GatewayServer { + gateways: BTreeMap::new(), + service_registry, + healthz, + own_api_address, + proxied_push_subs, + platform_validator, + } + } + + pub fn has_configurations(&self) -> bool { + !self.gateways.is_empty() + } + + pub fn get_gateway(&self, name: &str) -> Option<&Arc> { + self.gateways.get(name) + } + + pub fn add_gateway(&mut self, gateway: Gateway) -> anyhow::Result<()> { + let name = gateway.name.clone(); + if self + .gateways + .insert(name.clone(), Arc::new(gateway)) + .is_some() + { + bail!("gateway {} already registered", &name) + } + + Ok(()) } pub async fn serve(self, listen_addr: &str) -> anyhow::Result<()> { @@ -137,10 +176,23 @@ impl Gateway { Ok(()) } + + fn target(&self, req: &RequestHeader) -> Option<&Arc> { + // TODO lookup the correct gateway via configured rules + // for testing purposes, look at `x-encore-gateway-name` header + if let Some(name) = req.headers().get("x-encore-gateway-name") { + if let Ok(name) = name.to_str() { + return self.get_gateway(name); + } + } + + // fallback to legacy behaviour + self.get_gateway("api-gateway") + } } #[async_trait] -impl ProxyHttp for Gateway { +impl ProxyHttp for GatewayServer { type CTX = Option; fn new_ctx(&self) -> Self::CTX { @@ -153,13 +205,13 @@ impl ProxyHttp for Gateway { async fn request_filter( &self, session: &mut Session, - _ctx: &mut Self::CTX, + ctx: &mut Self::CTX, ) -> pingora::Result where Self::CTX: Send + Sync, { if session.req_header().uri.path() == "/__encore/healthz" { - let healthz_resp = self.inner.healthz.clone().health_check(); + let healthz_resp = self.healthz.clone().health_check(); let healthz_bytes: Vec = serde_json::to_vec(&healthz_resp) .or_err(ErrorType::HTTPStatus(500), "could not encode response")?; @@ -176,16 +228,16 @@ impl ProxyHttp for Gateway { return Ok(true); } - // preflight request, return early with cors headers - if axum::http::Method::OPTIONS == session.req_header().method { - let mut resp = ResponseHeader::build(200, None)?; - self.inner - .cors_config - .apply(session.req_header(), &mut resp)?; - resp.insert_header(header::CONTENT_LENGTH, 0)?; - session.write_response_header(Box::new(resp), true).await?; + if let Some(GatewayCtx { gateway, .. }) = ctx { + // preflight request, return early with cors headers + if axum::http::Method::OPTIONS == session.req_header().method { + let mut resp = ResponseHeader::build(200, None)?; + gateway.cors_config.apply(session.req_header(), &mut resp)?; + resp.insert_header(header::CONTENT_LENGTH, 0)?; + session.write_response_header(Box::new(resp), true).await?; - return Ok(true); + return Ok(true); + } } Ok(false) @@ -196,55 +248,83 @@ impl ProxyHttp for Gateway { session: &mut Session, ctx: &mut Self::CTX, ) -> pingora::Result> { + let target_gateway = self + .target(session.req_header()) + .ok_or_else(|| api::Error::not_found("gateway not found"))?; + let path = session.req_header().uri.path(); // Check if this is a pubsub push request and if we need to proxy it to another service let push_proxy_svc = path .strip_prefix("/__encore/pubsub/push/") - .and_then(|sub_id| self.inner.proxied_push_subs.get(sub_id)) + .and_then(|sub_id| self.proxied_push_subs.get(sub_id)) .map(|svc| Target { service_name: svc.clone(), requires_auth: false, }); - if let Some(own_api_addr) = &self.inner.own_api_address { + if let Some(own_api_addr) = &self.own_api_address { if push_proxy_svc.is_none() && path.starts_with("/__encore/") { return Ok(Box::new(HttpPeer::new(own_api_addr, false, "".to_string()))); } } - let target = push_proxy_svc.map_or_else( - || { - // Find which service handles the path route - session - .req_header() - .method - .as_ref() - .try_into() - .map_err(|e: anyhow::Error| api::Error { + + let mut upstream_path = session.req_header().uri.path(); + let target = + if let Some(ref target) = push_proxy_svc { + target + } else { + let method = session.req_header().method.as_ref().try_into().map_err( + |e: anyhow::Error| api::Error { code: api::ErrCode::InvalidArgument, message: "invalid method".to_string(), internal_message: Some(e.to_string()), stack: None, details: None, - }) - .and_then(|method| self.inner.router.route_to_service(method, path)) - .cloned() - }, - Ok, - )?; + }, + )?; - let upstream = self - .inner + if session.get_header(INTERNAL_ROUTE_HEADER).is_some() { + platform::ValidationData::from_req(session.req_header()) + .and_then(|data| self.platform_validator.validate_platform_request(&data)) + .map_err(|_e| api::Error::unauthenticated())?; + + let target = target_gateway + .internal_router + .route_to_service(method, path)?; + + if let Some(new_path) = + upstream_path.strip_prefix(&format!("/{}", target.service_name)) + { + upstream_path = new_path; + target + } else { + return Err(api::Error::internal(anyhow::anyhow!( + "path was not prefixed with target service name" + )) + .into()); + } + } else { + target_gateway.router.route_to_service(method, path)? + } + }; + + let upstream_base_url = self .service_registry .service_base_url(&target.service_name) - .or_err(ErrorType::InternalError, "couldn't find upstream")?; - - let upstream_url: Url = upstream - .parse() + .or_err(ErrorType::InternalError, "couldn't find upstream")? + .parse::() .or_err(ErrorType::InternalError, "upstream not a valid url")?; - let upstream_addrs = upstream_url - .socket_addrs(|| match upstream_url.scheme() { + let upstream_base_path = upstream_base_url.path(); + let upstream_path = format!( + "{}{}", + upstream_base_path, + upstream_path.trim_start_matches('/') + ); + + let upstream_addrs = upstream_base_url + .socket_addrs(|| match upstream_base_url.scheme() { "https" => Some(443), "http" => Some(80), _ => None, @@ -259,15 +339,16 @@ impl ProxyHttp for Gateway { "didn't find any upstream ip addresses", )?; - let tls = upstream_url.scheme() == "https"; - let host = upstream_url.host().map(|h| h.to_string()); + let tls = upstream_base_url.scheme() == "https"; + let host = upstream_base_url.host().map(|h| h.to_string()); let peer = HttpPeer::new(upstream_addr, tls, host.clone().unwrap_or_default()); ctx.replace(GatewayCtx { - upstream_base_path: upstream_url.path().to_string(), + upstream_path: upstream_path.to_string(), upstream_host: host, upstream_service_name: target.service_name.clone(), upstream_require_auth: target.requires_auth, + gateway: target_gateway.clone(), }); Ok(Box::new(peer)) @@ -282,8 +363,8 @@ impl ProxyHttp for Gateway { where Self::CTX: Send + Sync, { - if ctx.is_some() { - self.inner + if let Some(GatewayCtx { gateway, .. }) = ctx { + gateway .cors_config .apply(session.req_header(), upstream_response)?; } @@ -302,11 +383,8 @@ impl ProxyHttp for Gateway { { if let Some(gateway_ctx) = ctx.as_ref() { let new_uri = gateway_ctx - .prepend_base_path(&upstream_request.uri) - .or_err( - ErrorType::InternalError, - "failed to prepend upstream base path", - )?; + .upstream_uri(upstream_request) + .or_err(ErrorType::InternalError, "failed to set upstream path")?; upstream_request.set_uri(new_uri); @@ -320,12 +398,11 @@ impl ProxyHttp for Gateway { if session.is_upgrade_req() { websocket::update_headers_from_websocket_protocol(upstream_request).or_err( ErrorType::HTTPStatus(400), - "invalid auth data passed in websocket protocol header", + "invalid data passed in websocket protocol header", )?; } let svc_auth_method = self - .inner .service_registry .service_auth_method(&gateway_ctx.upstream_service_name) .unwrap_or_else(|| Arc::new(svcauth::Noop)); @@ -341,7 +418,7 @@ impl ProxyHttp for Gateway { } let caller = Caller::Gateway { - gateway: self.inner.shared.name.clone(), + gateway: gateway_ctx.gateway.name.clone(), }; let mut desc = CallDesc { caller: &caller, @@ -358,7 +435,7 @@ impl ProxyHttp for Gateway { svc_auth_method: svc_auth_method.as_ref(), }; - if let Some(auth_handler) = &self.inner.shared.auth { + if let Some(auth_handler) = &gateway_ctx.gateway.auth_handler { let auth_response = auth_handler .authenticate(upstream_request, call_meta.clone()) .await @@ -387,7 +464,7 @@ impl ProxyHttp for Gateway { Ok(()) } - async fn fail_to_proxy(&self, session: &mut Session, e: &Error, _ctx: &mut Self::CTX) -> u16 + async fn fail_to_proxy(&self, session: &mut Session, e: &Error, ctx: &mut Self::CTX) -> u16 where Self::CTX: Send + Sync, { @@ -430,13 +507,12 @@ impl ProxyHttp for Gateway { ) }; - if let Err(e) = self - .inner - .cors_config - .apply(session.req_header(), &mut resp) - { - log::error!("failed setting cors header in error response: {e}"); + if let Some(GatewayCtx { gateway, .. }) = ctx { + if let Err(e) = gateway.cors_config.apply(session.req_header(), &mut resp) { + log::error!("failed setting cors header in error response: {e}"); + } } + session.set_keepalive(None); session .write_response_header(Box::new(resp), false) @@ -486,8 +562,3 @@ impl crate::api::auth::InboundRequest for RequestHeader { self.uri.query() } } - -struct SharedGatewayData { - name: EncoreName, - auth: Option, -} diff --git a/runtimes/core/src/api/gateway/router.rs b/runtimes/core/src/api/gateway/router.rs index 8cbeaf6849..399f4b01fa 100644 --- a/runtimes/core/src/api/gateway/router.rs +++ b/runtimes/core/src/api/gateway/router.rs @@ -18,13 +18,78 @@ impl Router { Router { main, fallback } } - pub fn add_routes( - &mut self, - routes: &PathSet>, - ) -> anyhow::Result<()> { + pub fn new_internal(services: Vec<&EncoreName>) -> anyhow::Result { + let mut router = Router::new(); + + for service in services { + let target = Some(Target { + service_name: service.clone(), + requires_auth: false, + }); + + router.main.insert( + format!("/{service}/*path"), + MethodRoute { + get: target.clone(), + head: target.clone(), + post: target.clone(), + put: target.clone(), + delete: target.clone(), + option: target.clone(), + trace: target.clone(), + patch: target.clone(), + }, + )?; + } + + Ok(router) + } + + pub fn route_to_service( + &self, + method: api::schema::Method, + path: &str, + ) -> Result<&Target, api::Error> { + let mut found_path_match = false; + for router in [&self.main, &self.fallback] { + if let Ok(service) = router.at(path) { + found_path_match = true; + let service = service.value.for_method(method); + if let Some(service) = service { + return Ok(service); + } + } + } + + // We couldn't find a matching route. + Err(if found_path_match { + api::Error { + code: api::ErrCode::NotFound, + message: "no route for method".to_string(), + internal_message: Some(format!("no route for method {:?}: {}", method, path)), + stack: None, + details: None, + } + } else { + api::Error { + code: api::ErrCode::NotFound, + message: "endpoint not found".to_string(), + internal_message: Some(format!("no such endpoint exists: {}", path)), + stack: None, + details: None, + } + }) + } +} + +impl TryFrom>> for Router { + type Error = anyhow::Error; + + fn try_from(routes: PathSet>) -> Result { + let mut result = Router::new(); for (router, routes) in [ - (&mut self.main, &routes.main), - (&mut self.fallback, &routes.fallback), + (&mut result.main, &routes.main), + (&mut result.fallback, &routes.fallback), ] { fn register_methods( mr: &mut MethodRoute, @@ -75,43 +140,7 @@ impl Router { } } - Ok(()) - } - - pub fn route_to_service( - &self, - method: api::schema::Method, - path: &str, - ) -> Result<&Target, api::Error> { - let mut found_path_match = false; - for router in [&self.main, &self.fallback] { - if let Ok(service) = router.at(path) { - found_path_match = true; - let service = service.value.for_method(method); - if let Some(service) = service { - return Ok(service); - } - } - } - - // We couldn't find a matching route. - Err(if found_path_match { - api::Error { - code: api::ErrCode::NotFound, - message: "no route for method".to_string(), - internal_message: Some(format!("no route for method {:?}: {}", method, path)), - stack: None, - details: None, - } - } else { - api::Error { - code: api::ErrCode::NotFound, - message: "endpoint not found".to_string(), - internal_message: Some(format!("no such endpoint exists: {}", path)), - stack: None, - details: None, - } - }) + Ok(result) } } diff --git a/runtimes/core/src/api/manager.rs b/runtimes/core/src/api/manager.rs index 0bb7f43606..513a9d8eff 100644 --- a/runtimes/core/src/api/manager.rs +++ b/runtimes/core/src/api/manager.rs @@ -6,7 +6,7 @@ use anyhow::Context; use crate::api::auth::{LocalAuthHandler, RemoteAuthHandler}; use crate::api::call::ServiceRegistry; -use crate::api::gateway::Gateway; +use crate::api::gateway::GatewayServer; use crate::api::http_server::HttpServer; use crate::api::paths::Pather; use crate::api::reqauth::platform; @@ -22,6 +22,8 @@ use crate::trace::Tracer; use crate::{api, model, pubsub, secrets, EncoreName, EndpointName, Hosted}; use super::encore_routes::healthz; +use super::gateway::Gateway; +use super::paths::PathSet; use super::websocket_client::WebSocketClient; use super::ResponsePayload; @@ -55,7 +57,7 @@ pub struct Manager { api_server: Option, runtime: tokio::runtime::Handle, - gateways: HashMap, + gateway_server: Option, testing: bool, } @@ -123,7 +125,7 @@ impl ManagerConfig<'_> { self.secrets, endpoints.clone(), self.environment, - self.service_discovery, + self.service_discovery.clone(), own_api_address .as_ref() .map(|addr| addr.to_string()) @@ -146,54 +148,43 @@ impl ManagerConfig<'_> { .get(rid) .map(|gw| (gw.encore_name.as_str(), gw)) })); - let mut gateways = HashMap::new(); - let routes = paths::compute( - endpoints - .iter() - .map(|(_, ep)| RoutePerService(ep.to_owned())), - ); let mut auth_data_schemas = HashMap::new(); - for gw in &self.meta.gateways { - let Some(gw_cfg) = hosted_gateways.get(gw.encore_name.as_str()) else { - continue; - }; - let Some(cors_cfg) = &gw_cfg.cors else { - anyhow::bail!("missing CORS configuration for gateway {}", gw.encore_name); - }; + let mut gateway_server = GatewayServer::new( + service_registry.clone(), + healthz_handler.clone(), + own_api_address, + self.proxied_push_subs.clone(), + self.platform_validator.clone(), + ); - let auth_handler = build_auth_handler( + for (name, gw_cfg) in hosted_gateways { + let routes = paths::compute( + endpoints + .iter() + // TODO(fredr): we should filter out only public routes here + // .filter(|(_, ep)| ep.exposed.contains(name)) + .map(|(_, ep)| RoutePerService(ep.to_owned())), + ); + + let gw = build_gateway( self.meta, - gw, - &service_registry, + gw_cfg, + service_registry.clone(), + endpoints.clone(), + routes, self.http_client.clone(), self.tracer.clone(), - ) - .context("unable to build authenticator")?; - - let meta_headers = cors::MetaHeaders::from_schema(&endpoints, auth_handler.as_ref()); - let cors_config = cors::config(cors_cfg, meta_headers) - .context("failed to parse CORS configuration")?; + )?; auth_data_schemas.insert( - gw.encore_name.clone(), - auth_handler.as_ref().map(|ah| ah.auth_data().clone()), + name.to_string(), + gw.auth_handler().map(|ah| ah.auth_data().clone()), ); - gateways.insert( - gw.encore_name.clone().into(), - Gateway::new( - gw.encore_name.clone().into(), - service_registry.clone(), - routes.clone(), - auth_handler, - cors_config, - healthz_handler.clone(), - own_api_address, - self.proxied_push_subs.clone(), - ) - .context("couldn't create gateway")?, - ); + gateway_server + .add_gateway(gw) + .context("couldn't create gateway")?; } let api_server = if !hosted_services.is_empty() { @@ -211,12 +202,18 @@ impl ManagerConfig<'_> { None }; + let gateway_server = if self.meta.gateways.is_empty() { + None + } else { + Some(gateway_server) + }; + Ok(Manager { gateway_listen_addr, api_listener: Mutex::new(api_listener), service_registry, api_server, - gateways, + gateway_server, pubsub_push_registry: self.pubsub_push_registry, runtime: self.runtime, healthz: healthz_handler, @@ -243,6 +240,50 @@ impl Pather for RoutePerService { } } +fn build_gateway( + meta: &meta::Data, + gw_cfg: &runtime::Gateway, + service_registry: Arc, + endpoints: Arc>>, + routes: PathSet>, + http_client: reqwest::Client, + tracer: Tracer, +) -> anyhow::Result { + let gw_meta = meta + .gateways + .iter() + .find(|gw| gw.encore_name == gw_cfg.encore_name) + .ok_or_else(|| { + anyhow::anyhow!( + "missing meta configuration for gateway {}", + gw_cfg.encore_name + ) + })?; + + let cors_cfg = gw_cfg.cors.as_ref().ok_or_else(|| { + anyhow::anyhow!( + "missing CORS configuration for gateway {}", + gw_cfg.encore_name + ) + })?; + + let auth_handler = build_auth_handler(meta, gw_meta, &service_registry, http_client, tracer) + .context("unable to build authenticator")?; + + let meta_headers = + cors::MetaHeaders::from_schema(&gw_cfg.encore_name, &endpoints, auth_handler.as_ref()); + let cors_config = + cors::config(cors_cfg, meta_headers).context("failed to parse CORS configuration")?; + + Gateway::new( + gw_cfg.encore_name.clone().into(), + service_registry.clone(), + routes, + auth_handler, + cors_config, + ) +} + fn build_auth_handler( meta: &meta::Data, gw: &meta::Gateway, @@ -311,8 +352,10 @@ fn build_auth_handler( } impl Manager { - pub fn gateway(&self, name: &EncoreName) -> Option<&Gateway> { - self.gateways.get(name) + pub fn gateway(&self, name: &EncoreName) -> Option<&Arc> { + self.gateway_server + .as_ref() + .and_then(|gws| gws.get_gateway(name)) } pub fn server(&self) -> Option<&server::Server> { @@ -365,42 +408,43 @@ impl Manager { let server = HttpServer::new(encore_routes, api, fallback); let api_listener = self.api_listener.lock().unwrap().take(); - let gateway_listener = self.gateway_listen_addr.clone(); - // TODO handle multiple gateways - let gateway = self.gateways.values().next().cloned(); let testing = self.testing; + let gateway_server = self.gateway_server.clone(); + let gateway_listener = self.gateway_listen_addr.clone(); self.runtime.spawn(async move { - let gateway_parts = (gateway, gateway_listener); + let gateway_parts = (gateway_server, gateway_listener); let gateway_fut = match gateway_parts { - (Some(gw), Some(ref ln)) => { - if !testing { - log::debug!(addr=ln; "gateway listening for incoming requests"); - Some(gw.serve(ln)) - } else { - // No need running the gateway in tests + (Some(gws), Some(ref ln)) => { + if testing { + // No need to run gateway server in tests None + } else { + log::debug!(addr = ln; "gateway listening for incoming requests"); + Some(gws.serve(ln)) } }, (Some(_), None) => { - ::log::error!("internal encore error: misconfigured api gateway (missing listener), skipping"); + log::error!("internal encore error: misconfigured gateway server (missing listener), skipping"); None - } + }, (None, Some(_)) => { - ::log::error!("internal encore error: misconfigured api gateway (missing gateway config), skipping"); + log::error!("internal encore error: misconfigured gateway server (missing gateway config), skipping"); None - } + }, (None, None) => None, }; let api_fut = match api_listener { Some(ln) => { - let addr = ln.local_addr().map(|addr| addr.to_string()).unwrap_or_default(); + let addr = ln + .local_addr() + .map(|addr| addr.to_string()) + .unwrap_or_default(); log::debug!(addr = addr; "api server listening for incoming requests"); - ln - .set_nonblocking(true) + ln.set_nonblocking(true) .context("unable to set nonblocking")?; let axum_listener = tokio::net::TcpListener::from_std(ln) .context("unable to convert listener to tokio")?; @@ -412,7 +456,7 @@ impl Manager { tokio::select! { res = async { gateway_fut.unwrap().await }, if gateway_fut.is_some() => { - res.context("serve gateway").inspect_err(|err| log::error!("api gateway failed: {:?}", err))?; + res.context("serve gateway").inspect_err(|err| log::error!("gateway server failed: {:?}", err))?; }, res = async { api_fut.unwrap().await }, if api_fut.is_some() => { res.context("serve api").inspect_err(|err| log::error!("api server failed: {:?}", err))?; diff --git a/runtimes/core/src/api/reqauth/platform.rs b/runtimes/core/src/api/reqauth/platform.rs index ce132acad8..6dd967e49d 100644 --- a/runtimes/core/src/api/reqauth/platform.rs +++ b/runtimes/core/src/api/reqauth/platform.rs @@ -23,6 +23,32 @@ pub struct ValidationData<'a> { pub x_encore_auth_header: &'a str, } +impl<'a> ValidationData<'a> { + pub fn from_req(req: &'a axum::http::request::Parts) -> Result { + let Some(x_encore_auth_header) = req.headers.get("x-encore-auth") else { + return Err(ValidationError::MissingAuthHeader); + }; + let x_encore_auth_header = x_encore_auth_header + .to_str() + .map_err(|_| ValidationError::InvalidMac)?; + + let Some(date_header) = req.headers.get("Date") else { + return Err(ValidationError::InvalidDateHeader); + }; + let date_header = date_header + .to_str() + .map_err(|_| ValidationError::InvalidDateHeader)?; + + let request_path = req.uri.path(); + + Ok(ValidationData { + request_path, + date_header, + x_encore_auth_header, + }) + } +} + /// A seal of approval is a record that the request originated from the Encore Platform. #[derive(Debug)] pub struct SealOfApproval; @@ -141,6 +167,7 @@ pub enum ValidationError { UnknownMacKey, InvalidMacKey, InvalidDateHeader, + MissingAuthHeader, TimeSkew, SecretResolve(secrets::ResolveError), } @@ -152,6 +179,7 @@ impl Display for ValidationError { ValidationError::UnknownMacKey => write!(f, "unknown mac key"), ValidationError::InvalidMacKey => write!(f, "invalid mac key"), ValidationError::InvalidDateHeader => write!(f, "invalid or missing date header"), + ValidationError::MissingAuthHeader => write!(f, "missing auth header"), ValidationError::TimeSkew => write!(f, "time skew"), ValidationError::SecretResolve(e) => { write!(f, "resolve secret: {}", e) diff --git a/runtimes/core/src/names.rs b/runtimes/core/src/names.rs index d9232d38f3..434b6d524b 100644 --- a/runtimes/core/src/names.rs +++ b/runtimes/core/src/names.rs @@ -4,7 +4,7 @@ use std::fmt::Display; use std::hash::Hash; use std::ops::Deref; -#[derive(Debug, Clone, Eq, Hash, PartialEq)] +#[derive(Debug, Clone, Eq, Hash, PartialEq, PartialOrd, Ord)] pub struct EncoreName(String); impl Deref for EncoreName { diff --git a/runtimes/js/src/gateway.rs b/runtimes/js/src/gateway.rs index c743987768..cd59c64f26 100644 --- a/runtimes/js/src/gateway.rs +++ b/runtimes/js/src/gateway.rs @@ -16,13 +16,13 @@ use std::sync::Arc; #[napi] pub struct Gateway { #[allow(dead_code)] - gateway: Option, + gateway: Option>, } impl Gateway { pub fn new( env: Env, - gateway: Option, + gateway: Option>, cfg: GatewayConfig, ) -> napi::Result { if let Some(gw) = &gateway { diff --git a/tsparser/src/legacymeta/mod.rs b/tsparser/src/legacymeta/mod.rs index e8dec5b11b..ce3491aa0a 100644 --- a/tsparser/src/legacymeta/mod.rs +++ b/tsparser/src/legacymeta/mod.rs @@ -12,8 +12,7 @@ use crate::parser::resources::apis::{authhandler, gateway}; use crate::parser::resources::infra::cron::CronJobSchedule; use crate::parser::resources::infra::{cron, objects, pubsub_subscription, pubsub_topic, sqldb}; use crate::parser::resources::Resource; -use crate::parser::types::validation; -use crate::parser::types::{Object, ObjectId}; +use crate::parser::types::{validation, Object, ObjectId}; use crate::parser::usageparser::Usage; use crate::parser::{respath, FilePath, Range}; use litparser::{ParseResult as PResult, ToParseErr}; @@ -573,7 +572,7 @@ impl MetaBuilder<'_> { // If there is no gateway, add a default one. if self.data.gateways.is_empty() { self.data.gateways.push(v1::Gateway { - encore_name: "api-gateway".to_string(), + encore_name: DEFAULT_API_GATEWAY_NAME.to_string(), explicit: None, }); }