Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1c44459
feat: Add CRD maintainer
Techassi Sep 23, 2025
41f8649
docs: Add/improve various (doc) comments
Techassi Sep 23, 2025
9aa8941
fix: Use default crypto provider for TLS server
Techassi Sep 23, 2025
6d0916c
chore: Merge branch 'main' into feat/crd-maintenance
Techassi Sep 24, 2025
362eb48
chore(stackable-operator): Gate maintainer behind webhook feature
Techassi Sep 24, 2025
c8d0621
fix(stackable-operator): Make options fields public
Techassi Sep 24, 2025
2fc5c28
feat(stackable-operator): Add create_if_missing method to client
Techassi Sep 25, 2025
59a9426
chore(webhook): Add changelog entry
Techassi Sep 25, 2025
77cb05b
chore(operator): Add changelog entry
Techassi Sep 25, 2025
106dd16
chore: Streamline feature gate
Techassi Oct 1, 2025
bbf49f5
docs(operator): Add example in doc comment
Techassi Oct 1, 2025
ac5b9cb
refactor(operator): Adjust oneshot channel handling
Techassi Oct 1, 2025
d51491e
chore: Apply suggestions
Techassi Oct 1, 2025
d0c4a57
chore(webhook): Update dev comment
Techassi Oct 1, 2025
f1b4ed0
chore: Merge branch 'main' into feat/crd-maintenance
Techassi Oct 1, 2025
44a701a
fix(operator): Import ensure! macro
Techassi Oct 1, 2025
434c84a
chore: Merge branch 'main' into feat/crd-maintenance
Techassi Oct 8, 2025
99eae00
feat: Add ConversionWebhookServer::with_maintainer
Techassi Oct 8, 2025
fa55940
chore: Apply suggestion
Techassi Oct 13, 2025
352129e
chore: Adjust changelog entry
Techassi Oct 13, 2025
0508e1b
docs(webhook): Add example for ConversionWebhookServer::with_maintainer
Techassi Oct 13, 2025
16ed410
refactor(webhook): Make field manager a separate field
Techassi Oct 13, 2025
c80d965
chore: Merge branch 'main' into feat/crd-maintenance
Techassi Oct 13, 2025
548783f
test(webhook): Adjust doc tests
Techassi Oct 13, 2025
f9e16d9
refactor(webhook): Rename operator_name to operator_service_name
Techassi Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/stackable-operator/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ All notable changes to this project will be documented in this file.

### Added

- Add `CustomResourceDefinitionMaintainer` which applies and patches CRDs triggered by TLS
certificate rotations of the `ConversionWebhookServer`. It additionally provides a `oneshot`
channel which can for example be used to trigger creation/patching of any custom resources deployed by
the operator ([#1099]).
- Add a `Client::create_if_missing` associated function to create a resource if it doesn't
exist ([#1099]).
- Add CLI argument and env var to disable the end-of-support checker: `EOS_DISABLED` (`--eos-disabled`) ([#1101]).
- Add end-of-support checker ([#1096], [#1103]).
- The EoS checker can be constructed using `EndOfSupportChecker::new()`.
Expand All @@ -34,6 +40,7 @@ All notable changes to this project will be documented in this file.

[#1096]: https://github.com/stackabletech/operator-rs/pull/1096
[#1098]: https://github.com/stackabletech/operator-rs/pull/1098
[#1099]: https://github.com/stackabletech/operator-rs/pull/1099
[#1101]: https://github.com/stackabletech/operator-rs/pull/1101
[#1103]: https://github.com/stackabletech/operator-rs/pull/1103

Expand Down
19 changes: 19 additions & 0 deletions crates/stackable-operator/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,25 @@ impl Client {
})
}

/// Optionally creates a resource if it does not exist yet.
///
/// The name used for lookup is extracted from the resource via [`ResourceExt::name_any()`].
/// This function either returns the existing resource or the newly created one.
pub async fn create_if_missing<T>(&self, resource: &T) -> Result<T>
where
T: Clone + Debug + DeserializeOwned + Resource + Serialize + GetApi,
<T as Resource>::DynamicType: Default,
{
if let Some(r) = self
.get_opt(&resource.name_any(), resource.get_namespace())
.await?
{
return Ok(r);
}

self.create(resource).await
}

/// Patches a resource using the `MERGE` patch strategy described
/// in [JSON Merge Patch](https://tools.ietf.org/html/rfc7386)
/// This will fail for objects that do not exist yet.
Expand Down
15 changes: 15 additions & 0 deletions crates/stackable-webhook/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Changed

- BREAKING: `ConversionWebhookServer::new` now returns a pair of values ([#1099]):
- The conversion webhook server itself
- A `mpsc::Receiver<Certificate>` to provide consumers the newly generated TLS certificate
- BREAKING: Constants for ports, IP addresses and socket addresses are now associated constants on
`(Conversion)WebhookServer` instead of free-standing ones ([#1099]).

### Removed

- BREAKING: The `maintain_crds` and `field_manager` fields in `ConversionWebhookOptions`
are removed ([#1099]).

[#1099]: https://github.com/stackabletech/operator-rs/pull/1099

## [0.6.0] - 2025-09-09

### Added
Expand Down
21 changes: 0 additions & 21 deletions crates/stackable-webhook/src/constants.rs

This file was deleted.

25 changes: 19 additions & 6 deletions crates/stackable-webhook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
//! enable complete control over these details if needed.
//!
//! [1]: crate::servers::ConversionWebhookServer
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use ::x509_cert::Certificate;
use axum::{Router, routing::get};
use futures_util::{FutureExt as _, pin_mut, select};
use snafu::{ResultExt, Snafu};
Expand All @@ -35,19 +38,16 @@ use tokio::{
sync::mpsc,
};
use tower::ServiceBuilder;
use x509_cert::Certificate;

// use tower_http::trace::TraceLayer;
// Selected re-exports
pub use crate::options::WebhookOptions;
use crate::tls::TlsServer;

pub mod constants;
pub mod maintainer;
pub mod options;
pub mod servers;
pub mod tls;

// Selected re-exports
pub use crate::options::WebhookOptions;

/// A generic webhook handler receiving a request and sending back a response.
///
/// This trait is not intended to be implemented by external crates and this
Expand Down Expand Up @@ -86,6 +86,19 @@ pub struct WebhookServer {
}

impl WebhookServer {
/// The default HTTPS port `8443`
pub const DEFAULT_HTTPS_PORT: u16 = 8443;
/// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to,
/// which represents binding on all network addresses.
//
// TODO: We might want to switch to `Ipv6Addr::UNSPECIFIED)` here, as this *normally* binds to IPv4
// and IPv6. However, it's complicated and depends on the underlying system...
// If we do so, we should set `set_only_v6(false)` on the socket to not rely on system defaults.
pub const DEFAULT_LISTEN_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
/// The default socket address `0.0.0.0:8443` the webhook server binds to.
pub const DEFAULT_SOCKET_ADDRESS: SocketAddr =
SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT);

/// Creates a new ready-to-use webhook server.
///
/// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles
Expand Down
254 changes: 254 additions & 0 deletions crates/stackable-webhook/src/maintainer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
use k8s_openapi::{
ByteString,
apiextensions_apiserver::pkg::apis::apiextensions::v1::{
CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig,
WebhookConversion,
},
};
use kube::{
Api, Client, ResourceExt,
api::{Patch, PatchParams},
};
use snafu::{ResultExt, Snafu, ensure};
use tokio::sync::{mpsc, oneshot};
use x509_cert::{
Certificate,
der::{EncodePem, pem::LineEnding},
};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("failed to encode CA certificate as PEM format"))]
EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error },

#[snafu(display("failed to send initial CRD reconcile heartbeat"))]
SendInitialReconcileHeartbeat,

#[snafu(display("failed to patch CRD {crd_name:?}"))]
PatchCrd {
source: kube::Error,
crd_name: String,
},
}

/// Maintains various custom resource definitions.
///
/// When running this, the following operations are done:
///
/// - Apply the CRDs when starting up
/// - Reconcile the CRDs when the conversion webhook certificate is rotated
pub struct CustomResourceDefinitionMaintainer<'a> {
client: Client,
certificate_rx: mpsc::Receiver<Certificate>,

definitions: Vec<CustomResourceDefinition>,
options: CustomResourceDefinitionMaintainerOptions<'a>,

initial_reconcile_tx: oneshot::Sender<()>,
}

impl<'a> CustomResourceDefinitionMaintainer<'a> {
/// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more
/// custom resource definitions.
///
/// ## Parameters
///
/// This function expects four parameters:
///
/// - `client`: A [`Client`] to interact with the Kubernetes API server. It continuously patches
/// the CRDs when the TLS certificate is rotated.
/// - `certificate_rx`: A [`mpsc::Receiver`] to receive newly generated TLS certificates. The
/// certificate data sent through the channel is used to set the caBundle in the conversion
/// section of the CRD.
/// - `definitions`: An iterator of [`CustomResourceDefinition`]s which should be maintained
/// by this maintainer. If the iterator is empty, the maintainer returns early without doing
/// any work. As such, a polling mechanism which waits for all futures should be used to
/// prevent premature termination of the operator.
/// - `options`: Provides [`CustomResourceDefinitionMaintainerOptions`] to customize various
/// parts of the maintainer. In the future, this will be converted to a builder, to enable a
/// cleaner API interface.
///
/// ## Return Values
///
/// This function returns a 2-tuple (pair) of values:
///
/// - The [`CustomResourceDefinitionMaintainer`] itself. This is used to run the maintainer.
/// See [`CustomResourceDefinitionMaintainer::run`] for more details.
/// - The [`oneshot::Receiver`] which will be used to send out a message once the initial
/// CRD reconciliation ran. This signal can be used to trigger the deployment of custom
/// resources defined by the maintained CRDs.
///
/// ## Example
///
/// ```no_run
/// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion};
/// # use tokio::sync::mpsc::channel;
/// # use x509_cert::Certificate;
/// # use kube::Client;
/// use stackable_webhook::maintainer::{
/// CustomResourceDefinitionMaintainerOptions,
/// CustomResourceDefinitionMaintainer,
/// };
///
/// # #[tokio::main]
/// # async fn main() {
/// # let (certificate_tx, certificate_rx) = channel(1);
/// let options = CustomResourceDefinitionMaintainerOptions {
/// operator_name: "my-service-name",
/// operator_namespace: "my-namespace",
/// field_manager: "my-field-manager",
/// webhook_https_port: 8443,
/// disabled: true,
/// };
///
/// let client = Client::try_default().await.unwrap();
///
/// let definitions = vec![
/// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(),
/// S3Bucket::merged_crd(S3BucketVersion::V1Alpha1).unwrap(),
/// ];
///
/// let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new(
/// client,
/// certificate_rx,
/// definitions,
/// options,
/// );
/// # }
/// ```
pub fn new(
client: Client,
certificate_rx: mpsc::Receiver<Certificate>,
definitions: impl IntoIterator<Item = CustomResourceDefinition>,
options: CustomResourceDefinitionMaintainerOptions<'a>,
) -> (Self, oneshot::Receiver<()>) {
let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel();

let maintainer = Self {
definitions: definitions.into_iter().collect(),
initial_reconcile_tx,
certificate_rx,
options,
client,
};

(maintainer, initial_reconcile_rx)
}

/// Runs the [`CustomResourceDefinitionMaintainer`] asynchronously.
///
/// This needs to be polled in parallel with other parts of an operator, like controllers or
/// webhook servers. If it is disabled, the returned future immediately resolves to
/// [`std::task::Poll::Ready`] and thus doesn't consume any resources.
pub async fn run(mut self) -> Result<(), Error> {
let CustomResourceDefinitionMaintainerOptions {
operator_namespace,
webhook_https_port,
operator_name,
field_manager,
disabled,
} = self.options;

// If the maintainer is disabled or there are no custom resource definitions, immediately
// return without doing any work.
if disabled || self.definitions.is_empty() {
return Ok(());
}

// This channel can only be used exactly once. The sender's send method consumes self, and
// as such, the sender is wrapped in an Option to be able to call take to consume the inner
// value.
let mut initial_reconcile_tx = Some(self.initial_reconcile_tx);

// This get's polled by the async runtime on a regular basis (or when woken up). Once we
// receive a message containing the newly generated TLS certificate for the conversion
// webhook, we need to update the caBundle in the CRD.
while let Some(certificate) = self.certificate_rx.recv().await {
tracing::info!(
k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::<Vec<_>>(),
"reconciling custom resource definitions"
);

// The caBundle needs to be provided as a base64-encoded PEM envelope.
let ca_bundle = certificate
.to_pem(LineEnding::LF)
.context(EncodeCertificateAuthorityAsPemSnafu)?;

let crd_api: Api<CustomResourceDefinition> = Api::all(self.client.clone());

for crd in self.definitions.iter_mut() {
let crd_kind = &crd.spec.names.kind;
let crd_name = crd.name_any();

tracing::debug!(
k8s.crd.kind = crd_kind,
k8s.crd.name = crd_name,
"reconciling custom resource definition"
);

crd.spec.conversion = Some(CustomResourceConversion {
strategy: "Webhook".to_owned(),
webhook: Some(WebhookConversion {
// conversionReviewVersions indicates what ConversionReview versions are
// supported by the webhook. The first version in the list understood by the
// API server is sent to the webhook. The webhook must respond with a
// ConversionReview object in the same version it received. We only support
// the stable v1 ConversionReview to keep the implementation as simple as
// possible.
conversion_review_versions: vec!["v1".to_owned()],
client_config: Some(WebhookClientConfig {
service: Some(ServiceReference {
name: operator_name.to_owned(),
namespace: operator_namespace.to_owned(),
path: Some(format!("/convert/{crd_name}")),
port: Some(webhook_https_port.into()),
}),
// Here, ByteString takes care of encoding the provided content as
// base64.
ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())),
url: None,
}),
}),
});

// Deploy the updated CRDs using a server-side apply.
let patch = Patch::Apply(&crd);
let patch_params = PatchParams::apply(field_manager);
crd_api
.patch(&crd_name, &patch_params, &patch)
.await
.with_context(|_| PatchCrdSnafu { crd_name })?;
}

// After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out
// via the oneshot channel.
if let Some(initial_reconcile_tx) = initial_reconcile_tx.take() {
ensure!(
initial_reconcile_tx.send(()).is_ok(),
SendInitialReconcileHeartbeatSnafu
);
}
}

Ok(())
}
}

// TODO (@Techassi): Make this a builder instead
/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`].
pub struct CustomResourceDefinitionMaintainerOptions<'a> {
/// The service name used by the operator/conversion webhook and as a field manager.
pub operator_name: &'a str,

/// The namespace the operator/conversion webhook runs in.
pub operator_namespace: &'a str,

/// The field manager used when maintaining the CRDs.
pub field_manager: &'a str,

/// The HTTPS port the conversion webhook listens on.
pub webhook_https_port: u16,

/// Indicates if the maintainer should be disabled.
pub disabled: bool,
}
Loading