Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct node2 #1434

Closed
wants to merge 13 commits into from
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:
# Used to avoid rate limits when fetching the releases from k3s repo.
# Anonymous access is limited to 60 requests / hour / worker
# github-token: ${{ secrets.GITHUB_TOKEN }}
k3d-args: "--no-lb --no-rollback --k3s-arg --disable=traefik,servicelb,metrics-server@server:*"
k3d-args: "-p 10250:10250 --no-rollback --k3s-arg --disable=traefik,servicelb,metrics-server@server:*"

# Real CI work starts here
- name: Build workspace
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ jobs:
with:
cluster-name: "test-cluster-1"
args: >-
--agents 1
--image docker.io/rancher/k3s:v1.24.4-k3s1
-p 10250:10250
--k3s-arg "--no-deploy=traefik,servicelb,metrics-server@server:*"
- name: Run cargo-tarpaulin
uses: actions-rs/[email protected]
Expand Down
6 changes: 6 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rustls-tls = ["kube/client", "kube/rustls-tls", "kube/unstable-client"]
runtime = ["kube/runtime", "kube/unstable-runtime"]
socks5 = ["kube/socks5"]
refresh = ["kube/oauth", "kube/oidc"]
kubelet-debug = ["kube/kubelet-debug"]
ws = ["kube/ws"]
latest = ["k8s-openapi/latest"]

Expand Down Expand Up @@ -141,6 +142,11 @@ path = "pod_attach.rs"
name = "pod_exec"
path = "pod_exec.rs"

[[example]]
name = "pod_log_kubelet_debug"
path = "pod_log_kubelet_debug.rs"
required-features = ["kubelet-debug"]

[[example]]
name = "pod_paged"
path = "pod_paged.rs"
Expand Down
81 changes: 81 additions & 0 deletions examples/pod_log_kubelet_debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Pod;
use tracing::*;

use futures::AsyncBufReadExt;
use hyper::Uri;
use kube::{
api::{Api, DeleteParams, ResourceExt},
core::{kubelet_debug::KubeletDebugParams, subresource::LogParams},
Client, Config,
};
use serde_json::json;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let client = Client::try_default().await?;
let pods: Api<Pod> = Api::namespaced(client, "default");

// create busybox pod that's alive for at most 30s
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "example",
"labels": { "app": "kube-rs-test" },
},
"spec": {
"terminationGracePeriodSeconds": 1,
"restartPolicy": "Never",
"containers": [{
"name": "busybox",
"image": "busybox:1.34.1",
"command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
}],
}
}))?;

match pods.create(&Default::default(), &p).await {
Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
Err(kube::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
Err(e) => return Err(e.into()), // any other case if a failure
}

// wait for container to finish
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Grab logs directly via the kubelet_debug interface
kubelet_log().await?;

// Delete it
info!("deleting");
let _ = pods.delete("example", &DeleteParams::default()).await?;

Ok(())
}

async fn kubelet_log() -> anyhow::Result<()> {
// Create a client for node proxy
// This uses an insecure configuration to talk to the kubelet directly
// and assumes 10250 is a reachable kubelet port (k3d default)
let mut config = Config::infer().await?;
config.accept_invalid_certs = true;
config.cluster_url = "https://localhost:10250".to_string().parse::<Uri>().unwrap();
let client: Client = config.try_into()?;

// Get logs directly from the node, bypassing the kube-apiserver
let kp = KubeletDebugParams {
name: "example",
namespace: "default",
..Default::default()
};
let lp = LogParams::default();
let mut logs_stream = client.kubelet_node_logs(&kp, "busybox", &lp).await?.lines();

while let Some(line) = logs_stream.try_next().await? {
println!("{line}");
}
Ok(())
}
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ e2e-job-musl features:
k3d:
k3d cluster create main --servers 1 --registry-create main --image rancher/k3s:v1.27.3-k3s1 \
--no-lb --no-rollback \
-p 10250:10250 --no-rollback \
--k3s-arg "--disable=traefik,servicelb,metrics-server@server:*" \
--k3s-arg '--kubelet-arg=eviction-hard=imagefs.available<1%,nodefs.available<1%@agent:*' \
--k3s-arg '--kubelet-arg=eviction-minimum-reclaim=imagefs.available=1%,nodefs.available=1%@agent:*' \
Expand Down
1 change: 1 addition & 0 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ default = ["client"]
rustls-tls = ["rustls", "rustls-pemfile", "hyper-rustls"]
openssl-tls = ["openssl", "hyper-openssl"]
ws = ["client", "tokio-tungstenite", "rand", "kube-core/ws", "tokio/macros"]
kubelet-debug = ["ws"]
oauth = ["client", "tame-oauth"]
oidc = ["client", "form_urlencoded"]
gzip = ["client", "tower-http/decompression-gzip"]
Expand Down
12 changes: 12 additions & 0 deletions kube-client/src/api/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ where
// ----------------------------------------------------------------------------

/// Marker trait for objects that support the ephemeral containers sub resource.
///
/// See [`Api::get_ephemeral_containers`] et al.
pub trait Ephemeral {}

impl Ephemeral for k8s_openapi::api::core::v1::Pod {}
Expand Down Expand Up @@ -384,6 +386,8 @@ fn log_path() {
}

/// Marker trait for objects that has logs
///
/// See [`Api::logs`] and [`Api::log_stream`] for usage.
pub trait Log {}

impl Log for k8s_openapi::api::core::v1::Pod {}
Expand Down Expand Up @@ -446,6 +450,8 @@ fn evict_path() {
}

/// Marker trait for objects that can be evicted
///
/// See [`Api::evic`] for usage
pub trait Evict {}

impl Evict for k8s_openapi::api::core::v1::Pod {}
Expand Down Expand Up @@ -484,6 +490,8 @@ fn attach_path() {
}

/// Marker trait for objects that has attach
///
/// See [`Api::attach`] for usage
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub trait Attach {}
Expand Down Expand Up @@ -530,6 +538,8 @@ fn exec_path() {
}

/// Marker trait for objects that has exec
///
/// See [`Api::exec`] for usage.
#[cfg(feature = "ws")]
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
pub trait Execute {}
Expand Down Expand Up @@ -577,6 +587,8 @@ fn portforward_path() {
}

/// Marker trait for objects that has portforward
///
/// See [`Api::portforward`] for usage.
#[cfg(feature = "ws")]
pub trait Portforward {}

Expand Down
1 change: 0 additions & 1 deletion kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use hyper::{
client::{connect::Connection, HttpConnector},
};
use hyper_timeout::TimeoutConnector;
pub use kube_core::response::Status;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
Expand Down
91 changes: 91 additions & 0 deletions kube-client/src/client/kubelet_debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use crate::{
api::{AttachParams, AttachedProcess, LogParams, Portforwarder},
client::AsyncBufRead,
Client, Error, Result,
};
use kube_core::{kubelet_debug::KubeletDebugParams, Request};
use std::fmt::Debug;

/// Methods to access debug endpoints directly on `kubelet`
///
/// These provide analogous methods to the `Pod` api methods for [`Execute`](crate::api::Exec), [`Attach`](crate::api::Attach), and [`Portforward`](crate::api::Portforward).
/// Service account must have `nodes/proxy` access, and
/// the debug handlers must be enabled either via `--enable-debugging-handlers ` or in the [kubelet config](https://kubernetes.io/docs/reference/config-api/kubelet-config.v1beta1/#kubelet-config-k8s-io-v1beta1-KubeletConfiguration).
/// See the [kubelet source](https://github.com/kubernetes/kubernetes/blob/b3926d137cd2964cd3a04088ded30845910547b1/pkg/kubelet/server/server.go#L454), and [kubelet reference](https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet/) for more info.
///
/// ## Warning
/// These methods require direct, and **insecure access** to `kubelet` and is only available under the `kubelet_debug` feature.
/// End-to-end usage is explored in the [pod_log_kubelet_debug](https://github.com/kube-rs/kube/blob/main/examples/pod_log_kubelet_debug.rs) example.
#[cfg(feature = "kubelet-debug")]
impl Client {
/// Attach to pod directly from the node
///
/// ## Warning
/// This method uses the insecure `kubelet_debug` interface. See [`Api::attach`](crate::Api::attach) for the normal interface.
pub async fn kubelet_node_attach(
&self,
kubelet_params: &KubeletDebugParams<'_>,
container: &str,
ap: &AttachParams,
) -> Result<AttachedProcess> {
let mut req =
Request::kubelet_node_attach(kubelet_params, container, ap).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_attach");
let stream = self.connect(req).await?;
Ok(AttachedProcess::new(stream, ap))
}

/// Execute a command in a pod directly from the node
///
/// ## Warning
/// This method uses the insecure `kubelet_debug` interface. See [`Api::exec`](crate::Api::exec) for the normal interface.
pub async fn kubelet_node_exec<I, T>(
&self,
kubelet_params: &KubeletDebugParams<'_>,
container: &str,
command: I,
ap: &AttachParams,
) -> Result<AttachedProcess>
where
I: IntoIterator<Item = T> + Debug,
T: Into<String>,
{
let mut req = Request::kubelet_node_exec(kubelet_params, container, command, ap)
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_exec");
let stream = self.connect(req).await?;
Ok(AttachedProcess::new(stream, ap))

Check warning on line 57 in kube-client/src/client/kubelet_debug.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/kubelet_debug.rs#L53-L57

Added lines #L53 - L57 were not covered by tests
}

/// Forward ports of a pod directly from the node
///
/// ## Warning
/// This method uses the insecure `kubelet_debug` interface. See [`Api::portforward`](crate::Api::portforward) for the normal interface.
pub async fn kubelet_node_portforward(
&self,
kubelet_params: &KubeletDebugParams<'_>,
ports: &[u16],
) -> Result<Portforwarder> {
let mut req =
Request::kubelet_node_portforward(kubelet_params, ports).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_portforward");
let stream = self.connect(req).await?;
Ok(Portforwarder::new(stream, ports))
}

/// Stream logs directly from node
///
/// ## Warning
/// This method uses the insecure `kubelet_debug` interface. See [`Api::log_stream`](crate::Api::log_stream) for the normal interface.
pub async fn kubelet_node_logs(
&self,
kubelet_params: &KubeletDebugParams<'_>,
container: &str,
lp: &LogParams,
) -> Result<impl AsyncBufRead> {
let mut req =
Request::kubelet_node_logs(kubelet_params, container, lp).map_err(Error::BuildRequest)?;
req.extensions_mut().insert("kubelet_node_log");
self.request_stream(req).await
}
}
5 changes: 5 additions & 0 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! retrieve the resources served by the kubernetes API.
use either::{Either, Left, Right};
use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response, StatusCode};

Check warning on line 12 in kube-client/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `StatusCode`

warning: unused import: `StatusCode` --> kube-client/src/client/mod.rs:12:37 | 12 | use http::{self, Request, Response, StatusCode}; | ^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default

Check warning on line 12 in kube-client/src/client/mod.rs

View workflow job for this annotation

GitHub Actions / msrv

unused import: `StatusCode`
use hyper::Body;
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
pub use kube_core::response::Status;
Expand Down Expand Up @@ -41,6 +41,7 @@
pub use auth::Error as AuthError;
pub use config_ext::ConfigExt;
pub mod middleware;

#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;

#[cfg(feature = "openssl-tls")]
Expand All @@ -58,6 +59,10 @@

#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;

#[cfg(feature = "kubelet-debug")]
#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
mod kubelet_debug;

pub use builder::{ClientBuilder, DynBody};

/// Client for connecting with a Kubernetes cluster.
Expand Down
Loading
Loading