Skip to content

Commit

Permalink
runtime metric idea
Browse files Browse the repository at this point in the history
kind of horrible test impl
- creates a mutex struct and threads it into the controller
- forks off substructs into relevant bits which self-update no change
- used in scheduler to update queue length

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux committed Jul 1, 2024
1 parent f9902f1 commit b306c15
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 2 deletions.
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ default = ["rustls-tls", "kubederive", "ws", "latest", "socks5", "runtime", "ref
kubederive = ["kube/derive"]
openssl-tls = ["kube/client", "kube/openssl-tls", "kube/unstable-client"]
rustls-tls = ["kube/client", "kube/rustls-tls", "kube/unstable-client"]
runtime = ["kube/runtime", "kube/unstable-runtime"]
runtime = ["kube/runtime", "kube/unstable-runtime", "kube/unstable-metrics"]
socks5 = ["kube/socks5"]
refresh = ["kube/oauth", "kube/oidc"]
kubelet-debug = ["kube/kubelet-debug"]
Expand Down
14 changes: 13 additions & 1 deletion examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,19 @@ async fn main() -> Result<()> {
});

// limit the controller to running a maximum of two concurrent reconciliations
let config = Config::default().concurrency(2);
let metrics = Arc::new(kube::runtime::metrics::Metrics::default());
let config = Config::default()
.concurrency(2)
.metrics(metrics.clone())
.debounce(Duration::from_secs(3));
tokio::spawn(async move {
// Show metric state every 5 seconds
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let state = metrics.scheduler.read();
info!("Current metrics: {state:?}");
}
});

Controller::new(cmgs, watcher::Config::default())
.owns(cms, watcher::Config::default())
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ unstable-runtime-subscribe = []
unstable-runtime-predicates = []
unstable-runtime-stream-control = []
unstable-runtime-reconcile-on = []
unstable-metrics = []

[package.metadata.docs.rs]
features = ["k8s-openapi/latest", "unstable-runtime"]
Expand Down
17 changes: 17 additions & 0 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ where
channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
let error_policy = Arc::new(error_policy);
let delay_store = store.clone();

#[cfg(feature = "unstable-metrics")]
let metrics = config.metrics.clone();

// TODO: how to share Metrics with debounced_scheduler?
// Create a stream of ObjectRefs that need to be reconciled
trystream_try_via(
// input: stream combining scheduled tasks and user specified inputs event
Expand All @@ -378,6 +383,9 @@ where
// all the Oks from the select gets passed through the scheduler stream, and are then executed
move |s| {
Runner::new(
#[cfg(feature = "unstable-metrics")]

Check warning on line 386 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L386

Added line #L386 was not covered by tests
debounced_scheduler(s, config.debounce).with_metrics(metrics.scheduler.clone()),
#[cfg(not(feature = "unstable-metrics"))]

Check warning on line 388 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L388

Added line #L388 was not covered by tests
debounced_scheduler(s, config.debounce),
config.concurrency,
move |request| {
Expand Down Expand Up @@ -515,6 +523,8 @@ where
pub struct Config {
debounce: Duration,
concurrency: u16,
#[cfg(feature = "unstable-metrics")]
metrics: Arc<crate::metrics::Metrics>,
}

impl Config {
Expand Down Expand Up @@ -548,6 +558,13 @@ impl Config {
self.concurrency = concurrency;
self
}

/// A loose idea of exposing metrics...
#[cfg(feature = "unstable-metrics")]
pub fn metrics(mut self, metrics: Arc<crate::metrics::Metrics>) -> Self {

Check failure on line 564 in kube-runtime/src/controller/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

this method could have a `#[must_use]` attribute

error: this method could have a `#[must_use]` attribute --> kube-runtime/src/controller/mod.rs:564:5 | 564 | pub fn metrics(mut self, metrics: Arc<crate::metrics::Metrics>) -> Self { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: add the attribute: `#[must_use] pub fn metrics(mut self, metrics: Arc<crate::metrics::Metrics>) -> Self` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#must_use_candidate note: the lint level is defined here --> kube-runtime/src/lib.rs:12:9 | 12 | #![deny(clippy::pedantic)] | ^^^^^^^^^^^^^^^^ = note: `#[deny(clippy::must_use_candidate)]` implied by `#[deny(clippy::pedantic)]`
self.metrics = metrics;
self

Check warning on line 566 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L564-L566

Added lines #L564 - L566 were not covered by tests
}

Check failure on line 567 in kube-runtime/src/controller/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

missing `#[must_use]` attribute on a method returning `Self`

error: missing `#[must_use]` attribute on a method returning `Self` --> kube-runtime/src/controller/mod.rs:564:5 | 564 | / pub fn metrics(mut self, metrics: Arc<crate::metrics::Metrics>) -> Self { 565 | | self.metrics = metrics; 566 | | self 567 | | } | |_____^ | = help: consider adding the `#[must_use]` attribute to the method or directly to the `Self` type = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#return_self_not_must_use = note: `#[deny(clippy::return_self_not_must_use)]` implied by `#[deny(clippy::pedantic)]`
}

/// Controller for a Resource `K`
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod controller;
pub mod events;

pub mod finalizer;
#[cfg(feature = "unstable-metrics")] pub mod metrics;
pub mod reflector;
pub mod scheduler;
pub mod utils;
Expand Down
28 changes: 28 additions & 0 deletions kube-runtime/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! Optional metrics exposed by the runtime
use parking_lot::RwLock;
use std::sync::Arc;

/// Metrics relating to the `Scheduler`
#[derive(Default, Debug)]
pub struct SchedulerMetrics {

Check failure on line 7 in kube-runtime/src/metrics.rs

View workflow job for this annotation

GitHub Actions / clippy

item name ends with its containing module's name

error: item name ends with its containing module's name --> kube-runtime/src/metrics.rs:7:12 | 7 | pub struct SchedulerMetrics { | ^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#module_name_repetitions = note: `#[deny(clippy::module_name_repetitions)]` implied by `#[deny(clippy::pedantic)]`
/// Current size of the scheduler queue
pub queue_depth: usize,
}

/// All metrics
#[derive(Default, Debug)]
pub struct Metrics {
/// kube build info
pub build_info: String,
/// Metrics from the scheduler
pub scheduler: Arc<RwLock<SchedulerMetrics>>,
}

impl Metrics {
fn new() -> Self {

Check warning on line 22 in kube-runtime/src/metrics.rs

View workflow job for this annotation

GitHub Actions / clippy

associated function `new` is never used

warning: associated function `new` is never used --> kube-runtime/src/metrics.rs:22:8 | 21 | impl Metrics { | ------------ associated function in this implementation 22 | fn new() -> Self { | ^^^ | = note: `#[warn(dead_code)]` on by default

Check warning on line 22 in kube-runtime/src/metrics.rs

View workflow job for this annotation

GitHub Actions / msrv

associated function `new` is never used

Check warning on line 22 in kube-runtime/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/metrics.rs#L22

Added line #L22 was not covered by tests
Self {
build_info: env!("CARGO_PKG_VERSION").to_string(),

Check warning on line 24 in kube-runtime/src/metrics.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/metrics.rs#L24

Added line #L24 was not covered by tests
..Default::default()
}
}
}
24 changes: 24 additions & 0 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
//! Delays and deduplicates [`Stream`](futures::stream::Stream) items
#[cfg(feature = "unstable-metrics")] use crate::metrics::SchedulerMetrics;
use futures::{stream::Fuse, Stream, StreamExt};
use hashbrown::{hash_map::Entry, HashMap};
use parking_lot::RwLock;
use pin_project::pin_project;
use std::{
collections::HashSet,
hash::Hash,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
Expand Down Expand Up @@ -51,6 +54,8 @@ pub struct Scheduler<T, R> {
/// for a request to be emitted, if the scheduler is "uninterrupted" for the configured
/// debounce period. Its primary purpose to deduplicate requests that expire instantly.
debounce: Duration,
#[cfg(feature = "unstable-metrics")]
metrics: Arc<RwLock<SchedulerMetrics>>,
}

impl<T, R: Stream> Scheduler<T, R> {
Expand All @@ -61,8 +66,16 @@ impl<T, R: Stream> Scheduler<T, R> {
pending: HashSet::new(),
requests: requests.fuse(),
debounce,
#[cfg(feature = "unstable-metrics")]
metrics: Default::default(),
}
}

#[cfg(feature = "unstable-metrics")]
pub(crate) fn with_metrics(mut self, metrics: Arc<RwLock<SchedulerMetrics>>) -> Self {
self.metrics = metrics;
self
}
}

impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
Expand All @@ -74,6 +87,9 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
// Message is already pending, so we can't even expedite it
return;
}
#[cfg(feature = "unstable-metrics")]
self.update_metrics();

let next_time = request
.run_at
.checked_add(*self.debounce)
Expand Down Expand Up @@ -140,6 +156,12 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
self.pending.insert(msg);
}
}

/// Update metrics when configured
#[cfg(feature = "unstable-metrics")]
pub(crate) fn update_metrics(&mut self) {
self.metrics.write().queue_depth = self.queue.len();
}
}

/// See [`Scheduler::hold`]
Expand Down Expand Up @@ -167,6 +189,8 @@ where
}

scheduler.pop_queue_message_into_pending(cx);
#[cfg(feature = "unstable-metrics")]
scheduler.update_metrics();
Poll::Pending
}
}
Expand Down
1 change: 1 addition & 0 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ admission = ["kube-core/admission"]
derive = ["kube-derive", "kube-core/schema"]
runtime = ["kube-runtime"]
unstable-runtime = ["kube-runtime/unstable-runtime"]
unstable-metrics = ["kube-runtime/unstable-metrics"]
unstable-client = ["kube-client/unstable-client"]
socks5 = ["kube-client/socks5"]
http-proxy = ["kube-client/http-proxy"]
Expand Down

0 comments on commit b306c15

Please sign in to comment.