Skip to content
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
91fd3cb
feat!: Add support for mutating webhooks
sbernauer Nov 14, 2025
886efcc
Add state to the mutating webhook
sbernauer Nov 17, 2025
f48447c
Improve docs
sbernauer Nov 18, 2025
caa81fc
Add some docs
sbernauer Nov 18, 2025
587f296
refactor: Rename traits and structs
sbernauer Nov 19, 2025
8b518a0
Add some docs
sbernauer Nov 19, 2025
ef0d166
More docs
sbernauer Nov 19, 2025
16045e4
Apply suggestions from code review
sbernauer Nov 20, 2025
25c1e80
Update crates/stackable-webhook/src/lib.rs
sbernauer Nov 20, 2025
516f94d
formatiing
sbernauer Nov 20, 2025
ade4a1b
Rename operator to webhook
sbernauer Nov 20, 2025
7120310
Tkae the WebhookServerOptions by reference
sbernauer Nov 20, 2025
a070857
Remove leftover code
sbernauer Nov 20, 2025
85b73d3
Update crates/stackable-webhook/src/servers/mutating_webhook.rs
sbernauer Nov 20, 2025
6fc8a97
Move tracing layer to after routes
sbernauer Nov 20, 2025
2721d72
Improve tracing
sbernauer Nov 20, 2025
3ff8b2a
Rename servers mdodule to webhooks
sbernauer Nov 20, 2025
92f14b3
Add some docs
sbernauer Nov 20, 2025
6ca52f2
Add some docs
sbernauer Nov 20, 2025
2974ba0
changelog
sbernauer Nov 20, 2025
af9d446
changelog
sbernauer Nov 20, 2025
da6b1dc
Update crates/stackable-webhook/src/lib.rs
sbernauer Nov 20, 2025
00629de
Update crates/stackable-webhook/src/lib.rs
sbernauer Nov 20, 2025
9c20067
fix doc tests
sbernauer Nov 20, 2025
d79e432
Improve shutdown message
sbernauer Nov 20, 2025
dbba557
Add validating webhook as well
sbernauer Nov 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ repository = "https://github.com/stackabletech/operator-rs"
[workspace.dependencies]
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.8.0" }

arc-swap = "1.7"
arc-swap = "1.7.0"
async-trait = "0.1.89"
axum = { version = "0.8.1", features = ["http2"] }
chrono = { version = "0.4.38", default-features = false }
clap = { version = "4.5.17", features = ["derive", "cargo", "env"] }
Expand All @@ -38,7 +39,7 @@ k8s-openapi = { version = "0.26.0", default-features = false, features = ["schem
# We use rustls instead of openssl for easier portability, e.g. so that we can build stackablectl without the need to vendor (build from source) openssl
# We use ring instead of aws-lc-rs, as this currently fails to build in "make run-dev"
# We pin the kube version, as we use a patch for 2.0.1 below
kube = { version = "=2.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "rustls-tls", "ring"] }
kube = { version = "=2.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "admission", "rustls-tls", "ring"] }
opentelemetry = "0.31.0"
opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] }
opentelemetry-appender-tracing = "0.31.0"
Expand Down
11 changes: 11 additions & 0 deletions crates/stackable-webhook/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Added

- Add support for mutating webhooks ([#1119]).

### Changed

- BREAKING: Refactor the entire `WebhookServer` mechanism, so multiple webhooks can run in parallel.
Put individual webhooks (currently `ConversionWebhook` and `MutatingWebhook`) behind the `Webhook` trait ([#1119]).

[#1119]: https://github.com/stackabletech/operator-rs/pull/1119

## [0.7.1] - 2025-10-31

### Fixed
Expand Down
2 changes: 2 additions & 0 deletions crates/stackable-webhook/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ stackable-shared = { path = "../stackable-shared" }
stackable-telemetry = { path = "../stackable-telemetry" }

arc-swap.workspace = true
async-trait.workspace = true
axum.workspace = true
futures-util.workspace = true
hyper-util.workspace = true
Expand All @@ -21,6 +22,7 @@ kube.workspace = true
opentelemetry.workspace = true
opentelemetry-semantic-conventions.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tokio-rustls.workspace = true
Expand Down
232 changes: 125 additions & 107 deletions crates/stackable-webhook/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,92 +1,99 @@
//! Utility types and functions to easily create ready-to-use webhook servers
//! which can handle different tasks, for example CRD conversions. All webhook
//! servers use HTTPS by default. This library is fully compatible with the
//! [`tracing`] crate and emits debug level tracing data.
//! Utility types and functions to easily create ready-to-use webhook servers which can handle
//! different tasks. All webhook servers use HTTPS by default.
//!
//! Most users will only use the top-level exported generic [`WebhookServer`]
//! which enables complete control over the [Router] which handles registering
//! routes and their handler functions.
//! Currently the following webhooks are supported:
//!
//! ```
//! use stackable_webhook::{WebhookServer, WebhookOptions};
//! use axum::Router;
//! * [webhooks::ConversionWebhook]
//! * [webhooks::MutatingWebhook]
//! * In the future validating webhooks wil be added
//!
//! # async fn test() {
//! let router = Router::new();
//! let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default())
//! .await
//! .expect("failed to create WebhookServer");
//! # }
//! ```
//! This library is fully compatible with the [`tracing`] crate and emits debug level tracing data.
//!
//! For some usages, complete end-to-end [`WebhookServer`] implementations
//! exist. One such implementation is the [`ConversionWebhookServer`][1].
//!
//! This library additionally also exposes lower-level structs and functions to
//! enable complete control over these details if needed.
//!
//! [1]: crate::servers::ConversionWebhookServer
//! For usage please look at the [`WebhookServer`] docs as well as the specific [`Webhook`] you are
//! using.
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use ::x509_cert::Certificate;
use axum::{Router, routing::get};
use futures_util::{FutureExt as _, pin_mut, select};
use futures_util::{FutureExt as _, TryFutureExt, select};
use k8s_openapi::ByteString;
use snafu::{ResultExt, Snafu};
use stackable_telemetry::AxumTraceLayer;
use tokio::{
signal::unix::{SignalKind, signal},
sync::mpsc,
try_join,
};
use tower::ServiceBuilder;
use webhooks::{Webhook, WebhookError};
use x509_cert::der::{EncodePem, pem::LineEnding};

// Selected re-exports
pub use crate::options::WebhookOptions;
use crate::tls::TlsServer;

pub mod maintainer;
pub mod options;
pub mod servers;
pub mod tls;

/// A generic webhook handler receiving a request and sending back a response.
///
/// This trait is not intended to be implemented by external crates and this
/// library provides various ready-to-use implementations for it. One such an
/// implementation is part of the [`ConversionWebhookServer`][1].
///
/// [1]: crate::servers::ConversionWebhookServer
pub trait WebhookHandler<Req, Res> {
fn call(self, req: Req) -> Res;
}
pub mod webhooks;

/// A result type alias with the [`WebhookError`] type as the default error type.
pub type Result<T, E = WebhookError> = std::result::Result<T, E>;
pub type Result<T, E = WebhookServerError> = std::result::Result<T, E>;

#[derive(Debug, Snafu)]
pub enum WebhookError {
pub enum WebhookServerError {
#[snafu(display("failed to create TLS server"))]
CreateTlsServer { source: tls::TlsServerError },

#[snafu(display("failed to run TLS server"))]
RunTlsServer { source: tls::TlsServerError },

#[snafu(display("failed to update certificate"))]
UpdateCertificate { source: WebhookError },

#[snafu(display("failed to encode CA certificate as PEM format"))]
EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error },
}

/// A ready-to-use webhook server.
/// An HTTPS server that serves one or more webhooks.
///
/// It also handles TLS certificate rotation.
///
/// This server abstracts away lower-level details like TLS termination
/// and other various configurations, validations or middlewares. The routes
/// and their handlers are completely customizable by bringing your own
/// Axum [`Router`].
/// ### Example usage
///
/// For complete end-to-end implementations, see [`ConversionWebhookServer`][1].
/// ```
/// use stackable_webhook::WebhookServer;
/// use stackable_webhook::WebhookServerOptions;
/// use stackable_webhook::webhooks::Webhook;
///
/// [1]: crate::servers::ConversionWebhookServer
/// # async fn docs() {
/// let mut webhooks: Vec<Box<dyn Webhook>> = vec![];
///
/// let webhook_options = WebhookServerOptions {
/// socket_addr: WebhookServer::DEFAULT_SOCKET_ADDRESS,
/// webhook_namespace: "my-namespace".to_owned(),
/// webhook_service_name: "my-operator".to_owned(),
/// };
/// let webhook_server = WebhookServer::new(webhook_options, webhooks).await.unwrap();
/// # }
/// ```
pub struct WebhookServer {
options: WebhookServerOptions,
webhooks: Vec<Box<dyn Webhook>>,
tls_server: TlsServer,
cert_rx: mpsc::Receiver<Certificate>,
}

#[derive(Clone, Debug)]
pub struct WebhookServerOptions {
/// The HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] binds to.
pub socket_addr: SocketAddr,

/// The namespace the webhook is running in.
pub webhook_namespace: String,

/// The name of the Kubernetes service which points to the webhook.
pub webhook_service_name: String,
}

impl WebhookServer {
/// The default HTTPS port `8443`
/// The default HTTPS port
pub const DEFAULT_HTTPS_PORT: u16 = 8443;
/// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to,
/// which represents binding on all network addresses.
Expand All @@ -99,52 +106,13 @@ impl WebhookServer {
pub const DEFAULT_SOCKET_ADDRESS: SocketAddr =
SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT);

/// Creates a new ready-to-use webhook server.
/// Creates a new webhook server with the given config and list of [`Webhook`]s.
///
/// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles
/// routing based on the provided Axum `router`. Most of the time it is sufficient to use
/// [`WebhookOptions::default()`]. See the documentation for [`WebhookOptions`] for more details
/// on the default values.
///
/// To start the server, use the [`WebhookServer::run()`] function. This will
/// run the server using the Tokio runtime until it is terminated.
///
/// ### Basic Example
///
/// ```
/// use stackable_webhook::{WebhookServer, WebhookOptions};
/// use axum::Router;
///
/// # async fn test() {
/// let router = Router::new();
/// let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default())
/// .await
/// .expect("failed to create WebhookServer");
/// # }
/// ```
///
/// ### Example with Custom Options
///
/// ```
/// use stackable_webhook::{WebhookServer, WebhookOptions};
/// use axum::Router;
///
/// # async fn test() {
/// let options = WebhookOptions::builder()
/// .bind_address([127, 0, 0, 1], 8080)
/// .add_subject_alterative_dns_name("my-san-entry")
/// .build();
///
/// let router = Router::new();
/// let (server, cert_rx) = WebhookServer::new(router, options)
/// .await
/// .expect("failed to create WebhookServer");
/// # }
/// ```
/// Please read their documentation for details.
pub async fn new(
router: Router,
options: WebhookOptions,
) -> Result<(Self, mpsc::Receiver<Certificate>)> {
options: WebhookServerOptions,
webhooks: Vec<Box<dyn Webhook>>,
) -> Result<Self> {
tracing::trace!("create new webhook server");

// TODO (@Techassi): Make opt-in configurable from the outside
Expand All @@ -156,22 +124,33 @@ impl WebhookServer {
// by the Axum project.
//
// See https://docs.rs/axum/latest/axum/middleware/index.html#applying-multiple-middleware
// TODO (@NickLarsenNZ): rename this server_builder and keep it specific to tracing, since it's placement in the chain is important
let service_builder = ServiceBuilder::new().layer(trace_layer);
let trace_service_builder = ServiceBuilder::new().layer(trace_layer);

// Create the root router and merge the provided router into it.
tracing::debug!("create core router and merge provided router");
let mut router = Router::new();
for webhook in &webhooks {
router = webhook.register_routes(router);
}

let router = router
.layer(service_builder)
// Enrich spans for routes added above.
// Routes defined below it will not be instrumented to reduce noise.
.layer(trace_service_builder)
// The health route is below the AxumTraceLayer so as not to be instrumented
.route("/health", get(|| async { "ok" }));

tracing::debug!("create TLS server");
let (tls_server, cert_rx) = TlsServer::new(router, options)
let (tls_server, cert_rx) = TlsServer::new(router, &options)
.await
.context(CreateTlsServerSnafu)?;

Ok((Self { tls_server }, cert_rx))
Ok(Self {
options,
webhooks,
tls_server,
cert_rx,
})
}

/// Runs the Webhook server and sets up signal handlers for shutting down.
Expand Down Expand Up @@ -200,19 +179,58 @@ impl WebhookServer {
};

// select requires Future + Unpin
pin_mut!(future_server);
pin_mut!(future_signal);

futures_util::future::select(future_server, future_signal).await;
tokio::pin!(future_server);
tokio::pin!(future_signal);

tokio::select! {
res = &mut future_server => {
// If the server future errors, propagate the error
res?;
}
_ = &mut future_signal => {
tracing::info!("shutdown signal received, stopping webhook server");
}
}

Ok(())
}

/// Runs the webhook server by creating a TCP listener and binding it to
/// the specified socket address.
async fn run_server(self) -> Result<()> {
tracing::debug!("run webhook server");

self.tls_server.run().await.context(RunTlsServerSnafu)
let Self {
options,
mut webhooks,
tls_server,
mut cert_rx,
} = self;
let tls_server = tls_server
.run()
.map_err(|err| WebhookServerError::RunTlsServer { source: err });

let cert_update_loop = async {
loop {
while let Some(cert) = cert_rx.recv().await {
// The caBundle needs to be provided as a base64-encoded PEM envelope.
let ca_bundle = cert
.to_pem(LineEnding::LF)
.context(EncodeCertificateAuthorityAsPemSnafu)?;
let ca_bundle = ByteString(ca_bundle.as_bytes().to_vec());

for webhook in webhooks.iter_mut() {
webhook
.handle_certificate_rotation(&cert, &ca_bundle, &options)
.await
.context(UpdateCertificateSnafu)?;
}
}
}

// We need to hint the return type to the compiler
#[allow(unreachable_code)]
Ok(())
};

try_join!(cert_update_loop, tls_server).map(|_| ())
}
}
Loading