diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 55261bc73..26c7d10ca 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -164,7 +164,7 @@ jobs: # Used to avoid rate limits when fetching the releases from k3s repo. # Anonymous access is limited to 60 requests / hour / worker # github-token: ${{ secrets.GITHUB_TOKEN }} - k3d-args: "--no-lb --no-rollback --k3s-arg --disable=traefik,servicelb,metrics-server@server:*" + k3d-args: "-p 10250:10250 --no-rollback --k3s-arg --disable=traefik,servicelb,metrics-server@server:*" # Real CI work starts here - name: Build workspace diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 1545c32f7..c9c71bc21 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -27,8 +27,8 @@ jobs: with: cluster-name: "test-cluster-1" args: >- - --agents 1 --image docker.io/rancher/k3s:v1.24.4-k3s1 + -p 10250:10250 --k3s-arg "--no-deploy=traefik,servicelb,metrics-server@server:*" - name: Run cargo-tarpaulin uses: actions-rs/tarpaulin@v0.1 diff --git a/examples/Cargo.toml b/examples/Cargo.toml index baf14a47f..2e817fe3f 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -21,6 +21,7 @@ rustls-tls = ["kube/client", "kube/rustls-tls", "kube/unstable-client"] runtime = ["kube/runtime", "kube/unstable-runtime"] socks5 = ["kube/socks5"] refresh = ["kube/oauth", "kube/oidc"] +kubelet-debug = ["kube/kubelet-debug"] ws = ["kube/ws"] latest = ["k8s-openapi/latest"] @@ -141,6 +142,11 @@ path = "pod_attach.rs" name = "pod_exec" path = "pod_exec.rs" +[[example]] +name = "pod_log_kubelet_debug" +path = "pod_log_kubelet_debug.rs" +required-features = ["kubelet-debug"] + [[example]] name = "pod_paged" path = "pod_paged.rs" diff --git a/examples/pod_log_kubelet_debug.rs b/examples/pod_log_kubelet_debug.rs new file mode 100644 index 000000000..3925dabb9 --- /dev/null +++ b/examples/pod_log_kubelet_debug.rs @@ -0,0 +1,81 @@ +use futures::TryStreamExt; +use k8s_openapi::api::core::v1::Pod; +use tracing::*; + +use futures::AsyncBufReadExt; +use hyper::Uri; +use kube::{ + api::{Api, DeleteParams, ResourceExt}, + core::{kubelet_debug::KubeletDebugParams, subresource::LogParams}, + Client, Config, +}; +use serde_json::json; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let client = Client::try_default().await?; + let pods: Api = Api::namespaced(client, "default"); + + // create busybox pod that's alive for at most 30s + let p: Pod = serde_json::from_value(json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "example", + "labels": { "app": "kube-rs-test" }, + }, + "spec": { + "terminationGracePeriodSeconds": 1, + "restartPolicy": "Never", + "containers": [{ + "name": "busybox", + "image": "busybox:1.34.1", + "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"], + }], + } + }))?; + + match pods.create(&Default::default(), &p).await { + Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()), + Err(kube::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up + Err(e) => return Err(e.into()), // any other case if a failure + } + + // wait for container to finish + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Grab logs directly via the kubelet_debug interface + kubelet_log().await?; + + // Delete it + info!("deleting"); + let _ = pods.delete("example", &DeleteParams::default()).await?; + + Ok(()) +} + +async fn kubelet_log() -> anyhow::Result<()> { + // Create a client for node proxy + // This uses an insecure configuration to talk to the kubelet directly + // and assumes 10250 is a reachable kubelet port (k3d default) + let mut config = Config::infer().await?; + config.accept_invalid_certs = true; + config.cluster_url = "https://localhost:10250".to_string().parse::().unwrap(); + let client: Client = config.try_into()?; + + // Get logs directly from the node, bypassing the kube-apiserver + let kp = KubeletDebugParams { + name: "example", + namespace: "default", + ..Default::default() + }; + let lp = LogParams::default(); + let mut logs_stream = client.kubelet_node_logs(&kp, "busybox", &lp).await?.lines(); + + while let Some(line) = logs_stream.try_next().await? { + println!("{line}"); + } + Ok(()) +} diff --git a/justfile b/justfile index b8d8479b7..3cb269bf3 100644 --- a/justfile +++ b/justfile @@ -84,7 +84,7 @@ e2e-job-musl features: k3d: k3d cluster create main --servers 1 --registry-create main --image rancher/k3s:v1.27.3-k3s1 \ - --no-lb --no-rollback \ + -p 10250:10250 --no-rollback \ --k3s-arg "--disable=traefik,servicelb,metrics-server@server:*" \ --k3s-arg '--kubelet-arg=eviction-hard=imagefs.available<1%,nodefs.available<1%@agent:*' \ --k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' \ diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index 7656bd2e8..fbaf7b0dc 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -20,6 +20,7 @@ default = ["client"] rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls"] openssl-tls = ["openssl", "hyper-openssl"] ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws", "tokio/macros"] +kubelet-debug = ["ws"] oauth = ["client", "tame-oauth"] oidc = ["client", "form_urlencoded"] gzip = ["client", "tower-http/decompression-gzip"] diff --git a/kube-client/src/api/subresource.rs b/kube-client/src/api/subresource.rs index 996f0ea26..c62681365 100644 --- a/kube-client/src/api/subresource.rs +++ b/kube-client/src/api/subresource.rs @@ -132,6 +132,8 @@ where // ---------------------------------------------------------------------------- /// Marker trait for objects that support the ephemeral containers sub resource. +/// +/// See [`Api::get_ephemeral_containers`] et al. pub trait Ephemeral {} impl Ephemeral for k8s_openapi::api::core::v1::Pod {} @@ -384,6 +386,8 @@ fn log_path() { } /// Marker trait for objects that has logs +/// +/// See [`Api::logs`] and [`Api::log_stream`] for usage. pub trait Log {} impl Log for k8s_openapi::api::core::v1::Pod {} @@ -446,6 +450,8 @@ fn evict_path() { } /// Marker trait for objects that can be evicted +/// +/// See [`Api::evic`] for usage pub trait Evict {} impl Evict for k8s_openapi::api::core::v1::Pod {} @@ -484,6 +490,8 @@ fn attach_path() { } /// Marker trait for objects that has attach +/// +/// See [`Api::attach`] for usage #[cfg(feature = "ws")] #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] pub trait Attach {} @@ -530,6 +538,8 @@ fn exec_path() { } /// Marker trait for objects that has exec +/// +/// See [`Api::exec`] for usage. #[cfg(feature = "ws")] #[cfg_attr(docsrs, doc(cfg(feature = "ws")))] pub trait Execute {} @@ -577,6 +587,8 @@ fn portforward_path() { } /// Marker trait for objects that has portforward +/// +/// See [`Api::portforward`] for usage. #[cfg(feature = "ws")] pub trait Portforward {} diff --git a/kube-client/src/client/builder.rs b/kube-client/src/client/builder.rs index e1ae4871c..71a14ca79 100644 --- a/kube-client/src/client/builder.rs +++ b/kube-client/src/client/builder.rs @@ -5,7 +5,6 @@ use hyper::{ client::{connect::Connection, HttpConnector}, }; use hyper_timeout::TimeoutConnector; -pub use kube_core::response::Status; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder}; diff --git a/kube-client/src/client/kubelet_debug.rs b/kube-client/src/client/kubelet_debug.rs new file mode 100644 index 000000000..9c465bc12 --- /dev/null +++ b/kube-client/src/client/kubelet_debug.rs @@ -0,0 +1,91 @@ +use crate::{ + api::{AttachParams, AttachedProcess, LogParams, Portforwarder}, + client::AsyncBufRead, + Client, Error, Result, +}; +use kube_core::{kubelet_debug::KubeletDebugParams, Request}; +use std::fmt::Debug; + +/// Methods to access debug endpoints directly on `kubelet` +/// +/// These provide analogous methods to the `Pod` api methods for [`Execute`](crate::api::Exec), [`Attach`](crate::api::Attach), and [`Portforward`](crate::api::Portforward). +/// Service account must have `nodes/proxy` access, and +/// the debug handlers must be enabled either via `--enable-debugging-handlers ` or in the [kubelet config](https://kubernetes.io/docs/reference/config-api/kubelet-config.v1beta1/#kubelet-config-k8s-io-v1beta1-KubeletConfiguration). +/// See the [kubelet source](https://github.com/kubernetes/kubernetes/blob/b3926d137cd2964cd3a04088ded30845910547b1/pkg/kubelet/server/server.go#L454), and [kubelet reference](https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet/) for more info. +/// +/// ## Warning +/// These methods require direct, and **insecure access** to `kubelet` and is only available under the `kubelet_debug` feature. +/// End-to-end usage is explored in the [pod_log_kubelet_debug](https://github.com/kube-rs/kube/blob/main/examples/pod_log_kubelet_debug.rs) example. +#[cfg(feature = "kubelet-debug")] +impl Client { + /// Attach to pod directly from the node + /// + /// ## Warning + /// This method uses the insecure `kubelet_debug` interface. See [`Api::attach`](crate::Api::attach) for the normal interface. + pub async fn kubelet_node_attach( + &self, + kubelet_params: &KubeletDebugParams<'_>, + container: &str, + ap: &AttachParams, + ) -> Result { + let mut req = + Request::kubelet_node_attach(kubelet_params, container, ap).map_err(Error::BuildRequest)?; + req.extensions_mut().insert("kubelet_node_attach"); + let stream = self.connect(req).await?; + Ok(AttachedProcess::new(stream, ap)) + } + + /// Execute a command in a pod directly from the node + /// + /// ## Warning + /// This method uses the insecure `kubelet_debug` interface. See [`Api::exec`](crate::Api::exec) for the normal interface. + pub async fn kubelet_node_exec( + &self, + kubelet_params: &KubeletDebugParams<'_>, + container: &str, + command: I, + ap: &AttachParams, + ) -> Result + where + I: IntoIterator + Debug, + T: Into, + { + let mut req = Request::kubelet_node_exec(kubelet_params, container, command, ap) + .map_err(Error::BuildRequest)?; + req.extensions_mut().insert("kubelet_node_exec"); + let stream = self.connect(req).await?; + Ok(AttachedProcess::new(stream, ap)) + } + + /// Forward ports of a pod directly from the node + /// + /// ## Warning + /// This method uses the insecure `kubelet_debug` interface. See [`Api::portforward`](crate::Api::portforward) for the normal interface. + pub async fn kubelet_node_portforward( + &self, + kubelet_params: &KubeletDebugParams<'_>, + ports: &[u16], + ) -> Result { + let mut req = + Request::kubelet_node_portforward(kubelet_params, ports).map_err(Error::BuildRequest)?; + req.extensions_mut().insert("kubelet_node_portforward"); + let stream = self.connect(req).await?; + Ok(Portforwarder::new(stream, ports)) + } + + /// Stream logs directly from node + /// + /// ## Warning + /// This method uses the insecure `kubelet_debug` interface. See [`Api::log_stream`](crate::Api::log_stream) for the normal interface. + pub async fn kubelet_node_logs( + &self, + kubelet_params: &KubeletDebugParams<'_>, + container: &str, + lp: &LogParams, + ) -> Result { + let mut req = + Request::kubelet_node_logs(kubelet_params, container, lp).map_err(Error::BuildRequest)?; + req.extensions_mut().insert("kubelet_node_log"); + self.request_stream(req).await + } +} diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index 69e765803..3b615697d 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -41,6 +41,7 @@ mod config_ext; pub use auth::Error as AuthError; pub use config_ext::ConfigExt; pub mod middleware; + #[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls; #[cfg(feature = "openssl-tls")] @@ -58,6 +59,10 @@ pub use auth::oidc_errors; #[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError; +#[cfg(feature = "kubelet-debug")] +#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))] +mod kubelet_debug; + pub use builder::{ClientBuilder, DynBody}; /// Client for connecting with a Kubernetes cluster. diff --git a/kube-client/src/lib.rs b/kube-client/src/lib.rs index 427b4e25f..fac0dbe38 100644 --- a/kube-client/src/lib.rs +++ b/kube-client/src/lib.rs @@ -137,6 +137,7 @@ mod test { Api, Client, Config, ResourceExt, }; use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt}; + use hyper::Uri; use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec}; use kube_core::{ params::{DeleteParams, Patch, PatchParams, PostParams, WatchParams}, @@ -759,4 +760,134 @@ mod test { Ok(()) } + + #[tokio::test] + #[ignore = "needs kubelet debug methods"] + #[cfg(feature = "kubelet-debug")] + async fn pod_can_exec_and_write_to_stdin_from_node_proxy() -> Result<(), Box> { + use crate::{ + api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent}, + core::kubelet_debug::KubeletDebugParams, + }; + + let client = Client::try_default().await?; + let pods: Api = Api::default_namespaced(client); + + // create busybox pod that's alive for at most 30s + let p: Pod = serde_json::from_value(json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "busybox-kube2", + "labels": { "app": "kube-rs-test" }, + }, + "spec": { + "terminationGracePeriodSeconds": 1, + "restartPolicy": "Never", + "containers": [{ + "name": "busybox", + "image": "busybox:1.34.1", + "command": ["sh", "-c", "sleep 30"], + }], + } + }))?; + + match pods.create(&Default::default(), &p).await { + Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()), + Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up + Err(e) => return Err(e.into()), // any other case if a failure + } + + // Manual watch-api for it to become ready + // NB: don't do this; using conditions (see pod_api example) is easier and less error prone + let wp = WatchParams::default() + .fields(&format!("metadata.name={}", "busybox-kube2")) + .timeout(15); + let mut stream = pods.watch(&wp, "0").await?.boxed(); + while let Some(ev) = stream.try_next().await? { + match ev { + WatchEvent::Modified(o) => { + let s = o.status.as_ref().expect("status exists on pod"); + let phase = s.phase.clone().unwrap_or_default(); + if phase == "Running" { + break; + } + } + WatchEvent::Error(e) => panic!("watch error: {e}"), + _ => {} + } + } + + let mut config = Config::infer().await?; + config.accept_invalid_certs = true; + config.cluster_url = "https://localhost:10250".to_string().parse::().unwrap(); + let kubelet_client: Client = config.try_into()?; + + // Verify exec works and we can get the output + { + let mut attached = kubelet_client + .kubelet_node_exec( + &KubeletDebugParams { + name: "busybox-kube2", + namespace: "default", + ..Default::default() + }, + "busybox", + vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"], + &AttachParams::default().stderr(false), + ) + .await?; + let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); + let out = stdout + .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) }) + .collect::>() + .await + .join(""); + attached.join().await.unwrap(); + assert_eq!(out.lines().count(), 3); + assert_eq!(out, "1\n2\n3\n"); + } + + // Verify we can write to Stdin + { + use tokio::io::AsyncWriteExt; + let mut attached = kubelet_client + .kubelet_node_exec( + &KubeletDebugParams { + name: "busybox-kube2", + namespace: "default", + ..Default::default() + }, + "busybox", + vec!["sh"], + &AttachParams::default().stdin(true).stderr(false), + ) + .await?; + let mut stdin_writer = attached.stdin().unwrap(); + let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); + let next_stdout = stdout_stream.next(); + stdin_writer.write_all(b"echo test string 1\n").await?; + let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap(); + println!("{stdout}"); + assert_eq!(stdout, "test string 1\n"); + + // AttachedProcess resolves with status object. + // Send `exit 1` to get a failure status. + stdin_writer.write_all(b"exit 1\n").await?; + let status = attached.take_status().unwrap(); + if let Some(status) = status.await { + println!("{status:?}"); + assert_eq!(status.status, Some("Failure".to_owned())); + assert_eq!(status.reason, Some("NonZeroExitCode".to_owned())); + } + } + + // Delete it + let dp = DeleteParams::default(); + pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| { + assert_eq!(pdel.name_unchecked(), "busybox-kube2"); + }); + + Ok(()) + } } diff --git a/kube-core/Cargo.toml b/kube-core/Cargo.toml index eec4cb89c..cba219362 100644 --- a/kube-core/Cargo.toml +++ b/kube-core/Cargo.toml @@ -23,6 +23,7 @@ ws = [] admission = ["json-patch"] jsonpatch = ["json-patch"] schema = ["schemars"] +kubelet-debug = ["ws"] [dependencies] serde = { version = "1.0.130", features = ["derive"] } diff --git a/kube-core/src/kubelet_debug.rs b/kube-core/src/kubelet_debug.rs new file mode 100644 index 000000000..cbb3a24c3 --- /dev/null +++ b/kube-core/src/kubelet_debug.rs @@ -0,0 +1,244 @@ +//! Node proxy methods +use crate::{ + request::Error, + subresource::{AttachParams, LogParams}, + Request, +}; +use std::fmt::Debug; + +/// Struct that hold all required parameters to call specific pod methods from node +#[derive(Default)] +pub struct KubeletDebugParams<'a> { + /// Name of the pod + pub name: &'a str, + /// Namespace of the pod + pub namespace: &'a str, + /// Pod uid used to ensure that the pod name matches the pod uid + pub pod_uid: Option<&'a str>, +} + +impl KubeletDebugParams<'_> { + fn with_uid(&self) -> String { + if let Some(uid) = &self.pod_uid { + format!("{}/{}/{}", self.namespace, self.name, uid) + } else { + self.without_uid() + } + } + + fn without_uid(&self) -> String { + format!("{}/{}", self.namespace, self.name) + } +} + +impl Request { + /// Attach to pod directly from the node + pub fn kubelet_node_attach( + kubelet_debug_params: &KubeletDebugParams<'_>, + container: &str, + ap: &AttachParams, + ) -> Result>, Error> { + ap.validate()?; + + let target = format!("/attach/{}/{container}?", kubelet_debug_params.with_uid()); + let mut qp = form_urlencoded::Serializer::new(target); + ap.append_to_url_serializer_local(&mut qp); + + let req = http::Request::get(qp.finish()); + req.body(vec![]).map_err(Error::BuildRequest) + } + + /// Execute a command in a pod directly from the node + pub fn kubelet_node_exec( + kubelet_debug_params: &KubeletDebugParams<'_>, + container: &str, + command: I, + ap: &AttachParams, + ) -> Result>, Error> + where + I: IntoIterator + Debug, + T: Into, + { + ap.validate()?; + + let target = format!("/exec/{}/{container}?", kubelet_debug_params.with_uid()); + let mut qp = form_urlencoded::Serializer::new(target); + ap.append_to_url_serializer_local(&mut qp); + + for c in command.into_iter() { + qp.append_pair("command", &c.into()); + } + + let req = http::Request::get(qp.finish()); + req.body(vec![]).map_err(Error::BuildRequest) + } + + /// Forward ports of a pod directly from the node + pub fn kubelet_node_portforward( + kubelet_debug_params: &KubeletDebugParams<'_>, + ports: &[u16], + ) -> Result>, Error> { + if ports.is_empty() { + return Err(Error::Validation("ports cannot be empty".into())); + } + if ports.len() > 128 { + return Err(Error::Validation( + "the number of ports cannot be more than 128".into(), + )); + } + + if ports.len() > 1 { + let mut seen = std::collections::HashSet::with_capacity(ports.len()); + for port in ports.iter() { + if seen.contains(port) { + return Err(Error::Validation(format!( + "ports must be unique, found multiple {port}" + ))); + } + seen.insert(port); + } + } + + let base_url = format!("/portForward/{}?", kubelet_debug_params.with_uid()); + let mut qp = form_urlencoded::Serializer::new(base_url); + qp.append_pair( + "port", + &ports.iter().map(|p| p.to_string()).collect::>().join(","), + ); + let req = http::Request::get(qp.finish()); + req.body(vec![]).map_err(Error::BuildRequest) + } + + /// Stream logs directly from node + pub fn kubelet_node_logs( + kubelet_debug_params: &KubeletDebugParams<'_>, + container: &str, + lp: &LogParams, + ) -> Result>, Error> { + // Node logs is the only one that doesn't accept an uid for pod + let target = format!( + "/containerLogs/{}/{container}?", + kubelet_debug_params.without_uid() + ); + + let mut qp = form_urlencoded::Serializer::new(target); + + if lp.follow { + qp.append_pair("follow", "true"); + } + + if let Some(lb) = &lp.limit_bytes { + qp.append_pair("limitBytes", &lb.to_string()); + } + + if lp.pretty { + qp.append_pair("pretty", "true"); + } + + if lp.previous { + qp.append_pair("previous", "true"); + } + + if let Some(ss) = &lp.since_seconds { + qp.append_pair("sinceSeconds", &ss.to_string()); + } else if let Some(st) = &lp.since_time { + let ser_since = st.to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + qp.append_pair("sinceTime", &ser_since); + } + + if let Some(tl) = &lp.tail_lines { + qp.append_pair("tailLines", &tl.to_string()); + } + + if lp.timestamps { + qp.append_pair("timestamps", "true"); + } + + let urlstr = qp.finish(); + let req = http::Request::get(urlstr); + req.body(vec![]).map_err(Error::BuildRequest) + } +} + +#[cfg(test)] +mod test { + use crate::{ + kubelet_debug::KubeletDebugParams, + subresource::{AttachParams, LogParams}, + Request, + }; + #[test] + fn node_attach_test() { + let req = Request::kubelet_node_attach( + &KubeletDebugParams { + name: "some-name", + namespace: "some-namespace", + pod_uid: Some("some-uid"), + }, + "some-container", + &AttachParams::default().stdin(true).stderr(true).stdout(true), + ) + .unwrap(); + assert_eq!( + req.uri(), + "/attach/some-namespace/some-name/some-uid/some-container?&input=1&output=1&error=1" + ); + } + + #[test] + fn node_exec_test() { + let req = Request::kubelet_node_exec( + &KubeletDebugParams { + name: "some-name", + namespace: "some-namespace", + pod_uid: None, + }, + "some-container", + "ls -l".split_whitespace(), + &AttachParams::interactive_tty(), + ) + .unwrap(); + assert_eq!( + req.uri(), + "/exec/some-namespace/some-name/some-container?&input=1&output=1&tty=1&command=ls&command=-l" + ); + } + + #[test] + fn node_logs_test() { + let lp = LogParams { + tail_lines: Some(10), + follow: true, + timestamps: true, + ..Default::default() + }; + let req = Request::kubelet_node_logs( + &KubeletDebugParams { + name: "some-name", + namespace: "some-namespace", + pod_uid: None, + }, + "some-container", + &lp, + ) + .unwrap(); + assert_eq!( + req.uri(), + "/containerLogs/some-namespace/some-name/some-container?&follow=true&tailLines=10×tamps=true" + ); + } + + #[test] + fn node_portforward_test() { + let req = Request::kubelet_node_portforward( + &KubeletDebugParams { + name: "some-name", + namespace: "some-namespace", + pod_uid: None, + }, + &[1204], + ) + .unwrap(); + assert_eq!(req.uri(), "/portForward/some-namespace/some-name?&port=1204"); + } +} diff --git a/kube-core/src/lib.rs b/kube-core/src/lib.rs index a0371c169..94cd2f2ea 100644 --- a/kube-core/src/lib.rs +++ b/kube-core/src/lib.rs @@ -33,6 +33,8 @@ pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource}; pub mod metadata; pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta}; +#[cfg(feature = "kubelet-debug")] pub mod kubelet_debug; + pub mod object; pub use object::{NotUsed, Object, ObjectList}; diff --git a/kube-core/src/subresource.rs b/kube-core/src/subresource.rs index 6687b86f9..96934a6a6 100644 --- a/kube-core/src/subresource.rs +++ b/kube-core/src/subresource.rs @@ -263,7 +263,7 @@ impl AttachParams { self } - fn validate(&self) -> Result<(), Error> { + pub(crate) fn validate(&self) -> Result<(), Error> { if !self.stdin && !self.stdout && !self.stderr { return Err(Error::Validation( "AttachParams: one of stdin, stdout, or stderr must be true".into(), @@ -297,6 +297,23 @@ impl AttachParams { qp.append_pair("container", container); } } + + #[cfg(feature = "kubelet-debug")] + // https://github.com/kubernetes/kubernetes/blob/466d9378dbb0a185df9680657f5cd96d5e5aab57/pkg/apis/core/types.go#L6005-L6013 + pub(crate) fn append_to_url_serializer_local(&self, qp: &mut form_urlencoded::Serializer) { + if self.stdin { + qp.append_pair("input", "1"); + } + if self.stdout { + qp.append_pair("output", "1"); + } + if self.stderr { + qp.append_pair("error", "1"); + } + if self.tty { + qp.append_pair("tty", "1"); + } + } } #[cfg(feature = "ws")] diff --git a/kube/Cargo.toml b/kube/Cargo.toml index 9bd2df168..62f9f1fd2 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -28,6 +28,7 @@ openssl-tls = ["kube-client/openssl-tls"] # auxiliary features ws = ["kube-client/ws", "kube-core/ws"] +kubelet-debug = ["kube-client/kubelet-debug", "kube-core/kubelet-debug"] oauth = ["kube-client/oauth"] oidc = ["kube-client/oidc"] gzip = ["kube-client/gzip"]