Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ default = ["rustls-tls", "kubederive", "ws", "latest", "socks5", "runtime", "ref
kubederive = ["kube/derive"]
openssl-tls = ["kube/client", "kube/openssl-tls", "kube/unstable-client"]
rustls-tls = ["kube/client", "kube/rustls-tls", "kube/unstable-client"]
runtime = ["kube/runtime", "kube/unstable-runtime"]
runtime = ["kube/runtime", "kube/unstable-runtime", "kube/unstable-metrics"]
socks5 = ["kube/socks5"]
refresh = ["kube/oauth", "kube/oidc"]
kubelet-debug = ["kube/kubelet-debug"]
Expand Down
14 changes: 13 additions & 1 deletion examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,19 @@ async fn main() -> Result<()> {
});

// limit the controller to running a maximum of two concurrent reconciliations
let config = Config::default().concurrency(2);
let metrics = Arc::new(kube::runtime::metrics::Metrics::default());
let config = Config::default()
.concurrency(2)
.metrics(metrics.clone())
.debounce(Duration::from_secs(3));
tokio::spawn(async move {
// Show metric state every 5 seconds
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let state = metrics.scheduler.read();
info!("Current metrics: {state:?}");
}
});

Controller::new(cmgs, watcher::Config::default())
.owns(cms, watcher::Config::default())
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ unstable-runtime-subscribe = []
unstable-runtime-predicates = []
unstable-runtime-stream-control = []
unstable-runtime-reconcile-on = []
unstable-metrics = []

[package.metadata.docs.rs]
features = ["k8s-openapi/latest", "unstable-runtime"]
Expand Down
17 changes: 17 additions & 0 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ where
channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
let error_policy = Arc::new(error_policy);
let delay_store = store.clone();

#[cfg(feature = "unstable-metrics")]
let metrics = config.metrics.clone();

// TODO: how to share Metrics with debounced_scheduler?
// Create a stream of ObjectRefs that need to be reconciled
trystream_try_via(
// input: stream combining scheduled tasks and user specified inputs event
Expand All @@ -378,6 +383,9 @@ where
// all the Oks from the select gets passed through the scheduler stream, and are then executed
move |s| {
Runner::new(
#[cfg(feature = "unstable-metrics")]
debounced_scheduler(s, config.debounce).with_metrics(metrics.scheduler.clone()),
#[cfg(not(feature = "unstable-metrics"))]
debounced_scheduler(s, config.debounce),
config.concurrency,
move |request| {
Expand Down Expand Up @@ -515,6 +523,8 @@ where
pub struct Config {
debounce: Duration,
concurrency: u16,
#[cfg(feature = "unstable-metrics")]
metrics: Arc<crate::metrics::Metrics>,
}

impl Config {
Expand Down Expand Up @@ -548,6 +558,13 @@ impl Config {
self.concurrency = concurrency;
self
}

/// A loose idea of exposing metrics...
#[cfg(feature = "unstable-metrics")]
pub fn metrics(mut self, metrics: Arc<crate::metrics::Metrics>) -> Self {
self.metrics = metrics;
self
}
}

/// Controller for a Resource `K`
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod controller;
pub mod events;

pub mod finalizer;
#[cfg(feature = "unstable-metrics")] pub mod metrics;
pub mod reflector;
pub mod scheduler;
pub mod utils;
Expand Down
28 changes: 28 additions & 0 deletions kube-runtime/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! Optional metrics exposed by the runtime
use parking_lot::RwLock;
use std::sync::Arc;

/// Metrics relating to the `Scheduler`
#[derive(Default, Debug)]
pub struct SchedulerMetrics {
/// Current size of the scheduler queue
pub queue_depth: usize,
}

/// All metrics
#[derive(Default, Debug)]
pub struct Metrics {
/// kube build info
pub build_info: String,
/// Metrics from the scheduler
pub scheduler: Arc<RwLock<SchedulerMetrics>>,
}

impl Metrics {
fn new() -> Self {
Self {
build_info: env!("CARGO_PKG_VERSION").to_string(),
..Default::default()
}
}
}
24 changes: 24 additions & 0 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
//! Delays and deduplicates [`Stream`](futures::stream::Stream) items

#[cfg(feature = "unstable-metrics")] use crate::metrics::SchedulerMetrics;
use futures::{stream::Fuse, Stream, StreamExt};
use hashbrown::{hash_map::Entry, HashMap};
use parking_lot::RwLock;
use pin_project::pin_project;
use std::{
collections::HashSet,
hash::Hash,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
Expand Down Expand Up @@ -51,6 +54,8 @@ pub struct Scheduler<T, R> {
/// for a request to be emitted, if the scheduler is "uninterrupted" for the configured
/// debounce period. Its primary purpose to deduplicate requests that expire instantly.
debounce: Duration,
#[cfg(feature = "unstable-metrics")]
metrics: Arc<RwLock<SchedulerMetrics>>,
}

impl<T, R: Stream> Scheduler<T, R> {
Expand All @@ -61,8 +66,16 @@ impl<T, R: Stream> Scheduler<T, R> {
pending: HashSet::new(),
requests: requests.fuse(),
debounce,
#[cfg(feature = "unstable-metrics")]
metrics: Default::default(),
}
}

#[cfg(feature = "unstable-metrics")]
pub(crate) fn with_metrics(mut self, metrics: Arc<RwLock<SchedulerMetrics>>) -> Self {
self.metrics = metrics;
self
}
}

impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
Expand All @@ -74,6 +87,9 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
// Message is already pending, so we can't even expedite it
return;
}
#[cfg(feature = "unstable-metrics")]
self.update_metrics();

let next_time = request
.run_at
.checked_add(*self.debounce)
Expand Down Expand Up @@ -140,6 +156,12 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
self.pending.insert(msg);
}
}

/// Update metrics when configured
#[cfg(feature = "unstable-metrics")]
pub(crate) fn update_metrics(&mut self) {
self.metrics.write().queue_depth = self.queue.len();
}
}

/// See [`Scheduler::hold`]
Expand Down Expand Up @@ -167,6 +189,8 @@ where
}

scheduler.pop_queue_message_into_pending(cx);
#[cfg(feature = "unstable-metrics")]
scheduler.update_metrics();
Poll::Pending
}
}
Expand Down
1 change: 1 addition & 0 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ admission = ["kube-core/admission"]
derive = ["kube-derive", "kube-core/schema"]
runtime = ["kube-runtime"]
unstable-runtime = ["kube-runtime/unstable-runtime"]
unstable-metrics = ["kube-runtime/unstable-metrics"]
unstable-client = ["kube-client/unstable-client"]
socks5 = ["kube-client/socks5"]
http-proxy = ["kube-client/http-proxy"]
Expand Down