Skip to content

Commit

Permalink
feat: direct node access logs/portforward/exec/attach (#1428)
Browse files Browse the repository at this point in the history
* feat: direct node access logs/portforward/exec/attach

Signed-off-by: Adrien <[email protected]>

* remove doc as it's not compiling

Signed-off-by: Adrien <[email protected]>

* add k3d 10250 kubelet port

Signed-off-by: Adrien <[email protected]>

* Add new example: pod_log_node_proxy + pr comments

Signed-off-by: Adrien <[email protected]>

* revert unwanted change

Signed-off-by: Adrien <[email protected]>

* fix ci

Signed-off-by: Adrien <[email protected]>

* add in coverage too

Signed-off-by: Adrien <[email protected]>

* pr comments

Signed-off-by: Adrien <[email protected]>

* last fix

Signed-off-by: Adrien <[email protected]>

* fix typo on feature flag

Signed-off-by: Adrien <[email protected]>

* fix examples

Signed-off-by: Adrien <[email protected]>

* documentation fixes, renames and caveats to kubelet interface

since it shows up in `Client::` impls it should have caveats.
have linked to alternatives, prefixed the names with kubelet_
(which is a little redundant, but it helps here).

also documented and split up a few related things i found with just doc.

Signed-off-by: clux <[email protected]>

---------

Signed-off-by: Adrien <[email protected]>
Signed-off-by: clux <[email protected]>
Co-authored-by: clux <[email protected]>
  • Loading branch information
XciD and clux authored Mar 22, 2024
1 parent 88b8941 commit e9094ca
Show file tree
Hide file tree
Showing 16 changed files with 596 additions and 5 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:
# Used to avoid rate limits when fetching the releases from k3s repo.
# Anonymous access is limited to 60 requests / hour / worker
# github-token: ${{ secrets.GITHUB_TOKEN }}
k3d-args: "--no-lb --no-rollback --k3s-arg --disable=traefik,servicelb,metrics-server@server:*"
k3d-args: "-p 10250:10250 --no-rollback --k3s-arg --disable=traefik,servicelb,metrics-server@server:*"

# Real CI work starts here
- name: Build workspace
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ jobs:
with:
cluster-name: "test-cluster-1"
args: >-
--agents 1
--image docker.io/rancher/k3s:v1.24.4-k3s1
-p 10250:10250
--k3s-arg "--no-deploy=traefik,servicelb,metrics-server@server:*"
- name: Run cargo-tarpaulin
uses: actions-rs/[email protected]
Expand Down
6 changes: 6 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rustls-tls = ["kube/client", "kube/rustls-tls", "kube/unstable-client"]
runtime = ["kube/runtime", "kube/unstable-runtime"]
socks5 = ["kube/socks5"]
refresh = ["kube/oauth", "kube/oidc"]
kubelet-debug = ["kube/kubelet-debug"]
ws = ["kube/ws"]
latest = ["k8s-openapi/latest"]

Expand Down Expand Up @@ -141,6 +142,11 @@ path = "pod_attach.rs"
name = "pod_exec"
path = "pod_exec.rs"

[[example]]
name = "pod_log_kubelet_debug"
path = "pod_log_kubelet_debug.rs"
required-features = ["kubelet-debug"]

[[example]]
name = "pod_paged"
path = "pod_paged.rs"
Expand Down
81 changes: 81 additions & 0 deletions examples/pod_log_kubelet_debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Pod;
use tracing::*;

use futures::AsyncBufReadExt;
use hyper::Uri;
use kube::{
api::{Api, DeleteParams, ResourceExt},
core::{kubelet_debug::KubeletDebugParams, subresource::LogParams},
Client, Config,
};
use serde_json::json;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let client = Client::try_default().await?;
let pods: Api<Pod> = Api::namespaced(client, "default");

// create busybox pod that's alive for at most 30s
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "example",
"labels": { "app": "kube-rs-test" },
},
"spec": {
"terminationGracePeriodSeconds": 1,
"restartPolicy": "Never",
"containers": [{
"name": "busybox",
"image": "busybox:1.34.1",
"command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
}],
}
}))?;

match pods.create(&Default::default(), &p).await {
Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
Err(kube::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
Err(e) => return Err(e.into()), // any other case if a failure
}

// wait for container to finish
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Grab logs directly via the kubelet_debug interface
kubelet_log().await?;

// Delete it
info!("deleting");
let _ = pods.delete("example", &DeleteParams::default()).await?;

Ok(())
}

async fn kubelet_log() -> anyhow::Result<()> {
// Create a client for node proxy
// This uses an insecure configuration to talk to the kubelet directly
// and assumes 10250 is a reachable kubelet port (k3d default)
let mut config = Config::infer().await?;
config.accept_invalid_certs = true;
config.cluster_url = "https://localhost:10250".to_string().parse::<Uri>().unwrap();
let client: Client = config.try_into()?;

// Get logs directly from the node, bypassing the kube-apiserver
let kp = KubeletDebugParams {
name: "example",
namespace: "default",
..Default::default()
};
let lp = LogParams::default();
let mut logs_stream = client.kubelet_node_logs(&kp, "busybox", &lp).await?.lines();

while let Some(line) = logs_stream.try_next().await? {
println!("{line}");
}
Ok(())
}
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ e2e-job-musl features:
k3d:
k3d cluster create main --servers 1 --registry-create main --image rancher/k3s:v1.27.3-k3s1 \
--no-lb --no-rollback \
-p 10250:10250 --no-rollback \
--k3s-arg "--disable=traefik,servicelb,metrics-server@server:*" \
--k3s-arg '--kubelet-arg=eviction-hard=imagefs.available<1%,nodefs.available<1%@agent:*' \
--k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' \
Expand Down
1 change: 1 addition & 0 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ default = ["client"]
rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls"]
openssl-tls = ["openssl", "hyper-openssl"]
ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws", "tokio/macros"]
kubelet-debug = ["ws"]
oauth = ["client", "tame-oauth"]
oidc = ["client", "form_urlencoded"]
gzip = ["client", "tower-http/decompression-gzip"]
Expand Down
12 changes: 12 additions & 0 deletions kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ where
// ----------------------------------------------------------------------------

/// Marker trait for objects that support the ephemeral containers sub resource.
///
/// See [`Api::get_ephemeral_containers`] et al.
pub trait Ephemeral {}

impl Ephemeral for k8s_openapi::api::core::v1::Pod {}
Expand Down Expand Up @@ -384,6 +386,8 @@ fn log_path() {
}

/// Marker trait for objects that has logs
///
/// See [`Api::logs`] and [`Api::log_stream`] for usage.
pub trait Log {}

impl Log for k8s_openapi::api::core::v1::Pod {}
Expand Down Expand Up @@ -446,6 +450,8 @@ fn evict_path() {
}

/// Marker trait for objects that can be evicted
///
/// See [`Api::evic`] for usage
pub trait Evict {}

impl Evict for k8s_openapi::api::core::v1::Pod {}
Expand Down Expand Up @@ -484,6 +490,8 @@ fn attach_path() {
}

/// Marker trait for objects that has attach
///
/// See [`Api::attach`] for usage
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub trait Attach {}
Expand Down Expand Up @@ -530,6 +538,8 @@ fn exec_path() {
}

/// Marker trait for objects that has exec
///
/// See [`Api::exec`] for usage.
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub trait Execute {}
Expand Down Expand Up @@ -577,6 +587,8 @@ fn portforward_path() {
}

/// Marker trait for objects that has portforward
///
/// See [`Api::portforward`] for usage.
#[cfg(feature = "ws")]
pub trait Portforward {}

Expand Down
1 change: 0 additions & 1 deletion kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use hyper::{
client::{connect::Connection, HttpConnector},
};
use hyper_timeout::TimeoutConnector;
pub use kube_core::response::Status;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
Expand Down
91 changes: 91 additions & 0 deletions kube-client/src/client/kubelet_debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::{
api::{AttachParams, AttachedProcess, LogParams, Portforwarder},
client::AsyncBufRead,
Client, Error, Result,
};
use kube_core::{kubelet_debug::KubeletDebugParams, Request};
use std::fmt::Debug;

/// Methods to access debug endpoints directly on `kubelet`
///
/// These provide analogous methods to the `Pod` api methods for [`Execute`](crate::api::Exec), [`Attach`](crate::api::Attach), and [`Portforward`](crate::api::Portforward).
/// Service account must have `nodes/proxy` access, and
/// the debug handlers must be enabled either via `--enable-debugging-handlers ` or in the [kubelet config](https://kubernetes.io/docs/reference/config-api/kubelet-config.v1beta1/#kubelet-config-k8s-io-v1beta1-KubeletConfiguration).
/// See the [kubelet source](https://github.com/kubernetes/kubernetes/blob/b3926d137cd2964cd3a04088ded30845910547b1/pkg/kubelet/server/server.go#L454), and [kubelet reference](https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet/) for more info.
///
/// ## Warning
/// These methods require direct, and **insecure access** to `kubelet` and is only available under the `kubelet_debug` feature.
/// End-to-end usage is explored in the [pod_log_kubelet_debug](https://github.com/kube-rs/kube/blob/main/examples/pod_log_kubelet_debug.rs) example.
#[cfg(feature = "kubelet-debug")]
impl Client {
/// Attach to pod directly from the node
///
/// ## Warning
/// This method uses the insecure `kubelet_debug` interface. See [`Api::attach`](crate::Api::attach) for the normal interface.
pub async fn kubelet_node_attach(
&self,
kubelet_params: &KubeletDebugParams<'_>,
container: &str,
ap: &AttachParams,
) -> Result<AttachedProcess> {
let mut req =
Request::kubelet_node_attach(kubelet_params, container, ap).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_attach");
let stream = self.connect(req).await?;
Ok(AttachedProcess::new(stream, ap))
}

/// Execute a command in a pod directly from the node
///
/// ## Warning
/// This method uses the insecure `kubelet_debug` interface. See [`Api::exec`](crate::Api::exec) for the normal interface.
pub async fn kubelet_node_exec<I, T>(
&self,
kubelet_params: &KubeletDebugParams<'_>,
container: &str,
command: I,
ap: &AttachParams,
) -> Result<AttachedProcess>
where
I: IntoIterator<Item = T> + Debug,
T: Into<String>,
{
let mut req = Request::kubelet_node_exec(kubelet_params, container, command, ap)
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_exec");
let stream = self.connect(req).await?;
Ok(AttachedProcess::new(stream, ap))
}

/// Forward ports of a pod directly from the node
///
/// ## Warning
/// This method uses the insecure `kubelet_debug` interface. See [`Api::portforward`](crate::Api::portforward) for the normal interface.
pub async fn kubelet_node_portforward(
&self,
kubelet_params: &KubeletDebugParams<'_>,
ports: &[u16],
) -> Result<Portforwarder> {
let mut req =
Request::kubelet_node_portforward(kubelet_params, ports).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_portforward");
let stream = self.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
}

/// Stream logs directly from node
///
/// ## Warning
/// This method uses the insecure `kubelet_debug` interface. See [`Api::log_stream`](crate::Api::log_stream) for the normal interface.
pub async fn kubelet_node_logs(
&self,
kubelet_params: &KubeletDebugParams<'_>,
container: &str,
lp: &LogParams,
) -> Result<impl AsyncBufRead> {
let mut req =
Request::kubelet_node_logs(kubelet_params, container, lp).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_log");
self.request_stream(req).await
}
}
5 changes: 5 additions & 0 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod config_ext;
pub use auth::Error as AuthError;
pub use config_ext::ConfigExt;
pub mod middleware;

#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;

#[cfg(feature = "openssl-tls")]
Expand All @@ -58,6 +59,10 @@ pub use auth::oidc_errors;

#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;

#[cfg(feature = "kubelet-debug")]
#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
mod kubelet_debug;

pub use builder::{ClientBuilder, DynBody};

/// Client for connecting with a Kubernetes cluster.
Expand Down
Loading

0 comments on commit e9094ca

Please sign in to comment.