diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 96514c1fb..3f64dde5e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -14,7 +14,7 @@ default = ["rustls-tls", "kubederive", "ws", "latest", "socks5", "runtime", "ref kubederive = ["kube/derive"] openssl-tls = ["kube/client", "kube/openssl-tls", "kube/unstable-client"] rustls-tls = ["kube/client", "kube/rustls-tls", "kube/unstable-client"] -runtime = ["kube/runtime", "kube/unstable-runtime"] +runtime = ["kube/runtime", "kube/unstable-runtime", "kube/unstable-metrics"] socks5 = ["kube/socks5"] refresh = ["kube/oauth", "kube/oidc"] kubelet-debug = ["kube/kubelet-debug"] diff --git a/examples/configmapgen_controller.rs b/examples/configmapgen_controller.rs index 41b1ad1e4..71508996f 100644 --- a/examples/configmapgen_controller.rs +++ b/examples/configmapgen_controller.rs @@ -103,7 +103,19 @@ async fn main() -> Result<()> { }); // limit the controller to running a maximum of two concurrent reconciliations - let config = Config::default().concurrency(2); + let metrics = Arc::new(kube::runtime::metrics::Metrics::default()); + let config = Config::default() + .concurrency(2) + .metrics(metrics.clone()) + .debounce(Duration::from_secs(3)); + tokio::spawn(async move { + // Show metric state every 5 seconds + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let state = metrics.scheduler.read(); + info!("Current metrics: {state:?}"); + } + }); Controller::new(cmgs, watcher::Config::default()) .owns(cms, watcher::Config::default()) diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index debf8e5aa..e5ef3cde6 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -17,6 +17,7 @@ unstable-runtime-subscribe = [] unstable-runtime-predicates = [] unstable-runtime-stream-control = [] unstable-runtime-reconcile-on = [] +unstable-metrics = [] [package.metadata.docs.rs] features = ["k8s-openapi/latest", "unstable-runtime"] diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 3880d83fe..362582a62 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -353,6 +353,11 @@ where channel::mpsc::channel::>>(APPLIER_REQUEUE_BUF_SIZE); let error_policy = Arc::new(error_policy); let delay_store = store.clone(); + + #[cfg(feature = "unstable-metrics")] + let metrics = config.metrics.clone(); + + // TODO: how to share Metrics with debounced_scheduler? // Create a stream of ObjectRefs that need to be reconciled trystream_try_via( // input: stream combining scheduled tasks and user specified inputs event @@ -378,6 +383,9 @@ where // all the Oks from the select gets passed through the scheduler stream, and are then executed move |s| { Runner::new( + #[cfg(feature = "unstable-metrics")] + debounced_scheduler(s, config.debounce).with_metrics(metrics.scheduler.clone()), + #[cfg(not(feature = "unstable-metrics"))] debounced_scheduler(s, config.debounce), config.concurrency, move |request| { @@ -515,6 +523,8 @@ where pub struct Config { debounce: Duration, concurrency: u16, + #[cfg(feature = "unstable-metrics")] + metrics: Arc, } impl Config { @@ -548,6 +558,13 @@ impl Config { self.concurrency = concurrency; self } + + /// A loose idea of exposing metrics... + #[cfg(feature = "unstable-metrics")] + pub fn metrics(mut self, metrics: Arc) -> Self { + self.metrics = metrics; + self + } } /// Controller for a Resource `K` diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 553fca629..cc1a2c5b9 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -22,6 +22,7 @@ pub mod controller; pub mod events; pub mod finalizer; +#[cfg(feature = "unstable-metrics")] pub mod metrics; pub mod reflector; pub mod scheduler; pub mod utils; diff --git a/kube-runtime/src/metrics.rs b/kube-runtime/src/metrics.rs new file mode 100644 index 000000000..1ba0e20d3 --- /dev/null +++ b/kube-runtime/src/metrics.rs @@ -0,0 +1,28 @@ +//! Optional metrics exposed by the runtime +use parking_lot::RwLock; +use std::sync::Arc; + +/// Metrics relating to the `Scheduler` +#[derive(Default, Debug)] +pub struct SchedulerMetrics { + /// Current size of the scheduler queue + pub queue_depth: usize, +} + +/// All metrics +#[derive(Default, Debug)] +pub struct Metrics { + /// kube build info + pub build_info: String, + /// Metrics from the scheduler + pub scheduler: Arc>, +} + +impl Metrics { + fn new() -> Self { + Self { + build_info: env!("CARGO_PKG_VERSION").to_string(), + ..Default::default() + } + } +} diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index 195b7a4ec..8c2967c42 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -1,12 +1,15 @@ //! Delays and deduplicates [`Stream`](futures::stream::Stream) items +#[cfg(feature = "unstable-metrics")] use crate::metrics::SchedulerMetrics; use futures::{stream::Fuse, Stream, StreamExt}; use hashbrown::{hash_map::Entry, HashMap}; +use parking_lot::RwLock; use pin_project::pin_project; use std::{ collections::HashSet, hash::Hash, pin::Pin, + sync::Arc, task::{Context, Poll}, time::Duration, }; @@ -51,6 +54,8 @@ pub struct Scheduler { /// for a request to be emitted, if the scheduler is "uninterrupted" for the configured /// debounce period. Its primary purpose to deduplicate requests that expire instantly. debounce: Duration, + #[cfg(feature = "unstable-metrics")] + metrics: Arc>, } impl Scheduler { @@ -61,8 +66,16 @@ impl Scheduler { pending: HashSet::new(), requests: requests.fuse(), debounce, + #[cfg(feature = "unstable-metrics")] + metrics: Default::default(), } } + + #[cfg(feature = "unstable-metrics")] + pub(crate) fn with_metrics(mut self, metrics: Arc>) -> Self { + self.metrics = metrics; + self + } } impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { @@ -74,6 +87,9 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { // Message is already pending, so we can't even expedite it return; } + #[cfg(feature = "unstable-metrics")] + self.update_metrics(); + let next_time = request .run_at .checked_add(*self.debounce) @@ -140,6 +156,12 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { self.pending.insert(msg); } } + + /// Update metrics when configured + #[cfg(feature = "unstable-metrics")] + pub(crate) fn update_metrics(&mut self) { + self.metrics.write().queue_depth = self.queue.len(); + } } /// See [`Scheduler::hold`] @@ -167,6 +189,8 @@ where } scheduler.pop_queue_message_into_pending(cx); + #[cfg(feature = "unstable-metrics")] + scheduler.update_metrics(); Poll::Pending } } diff --git a/kube/Cargo.toml b/kube/Cargo.toml index b2c218deb..00c487919 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -33,6 +33,7 @@ admission = ["kube-core/admission"] derive = ["kube-derive", "kube-core/schema"] runtime = ["kube-runtime"] unstable-runtime = ["kube-runtime/unstable-runtime"] +unstable-metrics = ["kube-runtime/unstable-metrics"] unstable-client = ["kube-client/unstable-client"] socks5 = ["kube-client/socks5"] http-proxy = ["kube-client/http-proxy"]