Skip to content

Commit

Permalink
Merge branch 'main' into direct-node
Browse files Browse the repository at this point in the history
  • Loading branch information
XciD authored Mar 21, 2024
2 parents 806585b + 3d2b464 commit d83bbbe
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Generate release info
run: ./scripts/release-gh.sh
- name: Release
uses: softprops/action-gh-release@v1
uses: softprops/action-gh-release@v2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
Expand Down
6 changes: 3 additions & 3 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ release = false
[features]
default = ["rustls-tls", "kubederive", "ws", "latest", "socks5", "runtime", "refresh"]
kubederive = ["kube/derive"]
openssl-tls = ["kube/client", "kube/openssl-tls"]
rustls-tls = ["kube/client", "kube/rustls-tls"]
openssl-tls = ["kube/client", "kube/openssl-tls", "kube/unstable-client"]
rustls-tls = ["kube/client", "kube/rustls-tls", "kube/unstable-client"]
runtime = ["kube/runtime", "kube/unstable-runtime"]
socks5 = ["kube/socks5"]
refresh = ["kube/oauth", "kube/oidc"]
Expand All @@ -30,7 +30,7 @@ assert-json-diff = "2.0.1"
garde = { version = "0.18.0", default-features = false, features = ["derive"] }
anyhow = "1.0.44"
futures = "0.3.17"
jsonpath-rust = "0.4.0"
jsonpath-rust = "0.5.0"
kube = { path = "../kube", version = "^0.88.1", default-features = false, features = ["admission"] }
kube-derive = { path = "../kube-derive", version = "^0.88.1", default-features = false } # only needed to opt out of schema
k8s-openapi = { version = "0.21.0", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion examples/dynamic_jsonpath.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn main() -> anyhow::Result<()> {

// Use the given JSONPATH to filter the ObjectList
let list_json = serde_json::to_value(&list)?;
for res in jsonpath.find_slice(&list_json) {
for res in jsonpath.find_slice(&list_json, Default::default()) {
info!("\t\t {}", *res);
}
Ok(())
Expand Down
9 changes: 4 additions & 5 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
api::{Api, ListParams, ResourceExt},
client::{scope, Client},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let events: Api<Event> = Api::all(client.clone());
let nodes: Api<Node> = Api::all(client.clone());

let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false);
Expand All @@ -25,13 +24,13 @@ async fn main() -> anyhow::Result<()> {

pin_mut!(obs);
while let Some(n) = obs.try_next().await? {
check_for_node_failures(&events, n).await?;
check_for_node_failures(&client, n).await?;
}
Ok(())
}

// A simple node problem detector
async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result<()> {
async fn check_for_node_failures(client: &Client, o: Node) -> anyhow::Result<()> {
let name = o.name_any();
// Nodes often modify a lot - only print broken nodes
if let Some(true) = o.spec.unwrap().unschedulable {
Expand All @@ -52,7 +51,7 @@ async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result
// Find events related to this node
let opts =
ListParams::default().fields(&format!("involvedObject.kind=Node,involvedObject.name={name}"));
let evlist = events.list(&opts).await?;
let evlist = client.list::<Event>(&opts, &scope::Cluster).await?;
for e in evlist {
warn!("Node event: {:?}", serde_json::to_string_pretty(&e)?);
}
Expand Down
7 changes: 4 additions & 3 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@ jsonpatch = ["kube-core/jsonpatch"]
admission = ["kube-core/admission"]
config = ["__non_core", "pem", "home"]
socks5 = ["hyper-socks2"]
unstable-client = []

# private feature sets; do not use
__non_core = ["tracing", "serde_yaml", "base64"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest", "socks5"]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest", "socks5", "unstable-client"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
base64 = { version = "0.21.4", optional = true }
base64 = { version = "0.22.0", optional = true }
chrono = { version = "0.4.23", optional = true, default-features = false }
home = { version = "0.5.4", optional = true }
serde = { version = "1.0.130", features = ["derive"] }
Expand All @@ -57,7 +58,7 @@ rustls-pemfile = { version = "1.0.0", optional = true }
bytes = { version = "1.1.0", optional = true }
tokio = { version = "1.14.0", features = ["time", "signal", "sync"], optional = true }
kube-core = { path = "../kube-core", version = "=0.88.1" }
jsonpath-rust = { version = "0.4.0", optional = true }
jsonpath-rust = { version = "0.5.0", optional = true }
tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] }
hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] }
hyper-rustls = { version = "0.24.0", optional = true }
Expand Down
5 changes: 3 additions & 2 deletions kube-client/src/client/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use http::{
header::{InvalidHeaderValue, AUTHORIZATION},
HeaderValue, Request,
};
use jsonpath_rust::JsonPathInst;
use jsonpath_rust::{path::config::JsonPathConfig, JsonPathInst};
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -479,6 +479,7 @@ fn token_from_gcp_provider(provider: &AuthProviderConfig) -> Result<ProviderToke
}

fn extract_value(json: &serde_json::Value, context: &str, path: &str) -> Result<String, Error> {
let cfg = JsonPathConfig::default(); // no need for regex caching here
let parsed_path = path
.trim_matches(|c| c == '"' || c == '{' || c == '}')
.parse::<JsonPathInst>()
Expand All @@ -489,7 +490,7 @@ fn extract_value(json: &serde_json::Value, context: &str, path: &str) -> Result<
))
})?;

let res = parsed_path.find_slice(json);
let res = parsed_path.find_slice(json, cfg);

let Some(res) = res.into_iter().next() else {
return Err(Error::AuthExec(format!(
Expand Down
259 changes: 259 additions & 0 deletions kube-client/src/client/client_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
use crate::{Client, Error, Result};
use k8s_openapi::api::core::v1::Namespace as k8sNs;
use kube_core::{
object::ObjectList,
params::{GetParams, ListParams},
request::Request,
ClusterResourceScope, DynamicResourceScope, NamespaceResourceScope, Resource,
};
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;

/// A marker trait to indicate cluster-wide operations are available
trait ClusterScope {}
/// A marker trait to indicate namespace-scoped operations are available
trait NamespaceScope {}

// k8s_openapi scopes get implementations for free
impl ClusterScope for ClusterResourceScope {}
impl NamespaceScope for NamespaceResourceScope {}
// our DynamicResourceScope can masquerade as either
impl NamespaceScope for DynamicResourceScope {}
impl ClusterScope for DynamicResourceScope {}

/// How to get the url for a collection
///
/// Pick one of `kube::client::Cluster` or `kube::client::Namespace`.
pub trait CollectionUrl<K> {
fn url_path(&self) -> String;
}

/// How to get the url for an object
///
/// Pick one of `kube::client::Cluster` or `kube::client::Namespace`.
pub trait ObjectUrl<K> {
fn url_path(&self) -> String;
}

/// Marker type for cluster level queries
pub struct Cluster;
/// Namespace newtype for namespace level queries
///
/// You can create this directly, or convert `From` a `String` / `&str`, or `TryFrom` an `k8s_openapi::api::core::v1::Namespace`
pub struct Namespace(String);

/// Scopes for `unstable-client` [`Client#impl-Client`] extension methods
pub mod scope {
pub use super::{Cluster, Namespace};
}

// All objects can be listed cluster-wide
impl<K> CollectionUrl<K> for Cluster
where
K: Resource,
K::DynamicType: Default,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), None)
}
}

// Only cluster-scoped objects can be named globally
impl<K> ObjectUrl<K> for Cluster
where
K: Resource,
K::DynamicType: Default,
K::Scope: ClusterScope,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), None)
}
}

// Only namespaced objects can be accessed via namespace
impl<K> CollectionUrl<K> for Namespace
where
K: Resource,
K::DynamicType: Default,
K::Scope: NamespaceScope,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), Some(&self.0))
}
}

impl<K> ObjectUrl<K> for Namespace
where
K: Resource,
K::DynamicType: Default,
K::Scope: NamespaceScope,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), Some(&self.0))
}
}

// can be created from a complete native object
impl TryFrom<&k8sNs> for Namespace {
type Error = NamespaceError;

fn try_from(ns: &k8sNs) -> Result<Namespace, Self::Error> {
if let Some(n) = &ns.meta().name {
Ok(Namespace(n.to_owned()))
} else {
Err(NamespaceError::MissingName)
}
}
}
// and from literals + owned strings
impl From<&str> for Namespace {
fn from(ns: &str) -> Namespace {
Namespace(ns.to_owned())
}
}
impl From<String> for Namespace {
fn from(ns: String) -> Namespace {
Namespace(ns)
}
}

#[derive(thiserror::Error, Debug)]
/// Failures to infer a namespace
pub enum NamespaceError {
/// MissingName
#[error("Missing Namespace Name")]
MissingName,
}

/// Generic client extensions for the `unstable-client` feature
///
/// These methods allow users to query across a wide-array of resources without needing
/// to explicitly create an [`Api`](crate::Api) for each one of them.
///
/// ## Usage
/// 1. Create a [`Client`]
/// 2. Specify the [`scope`] you are querying at via [`Cluster`] or [`Namespace`] as args
/// 3. Specify the resource type you are using for serialization (e.g. a top level k8s-openapi type)
///
/// ## Example
///
/// ```no_run
/// # use k8s_openapi::api::core::v1::Pod;
/// # use k8s_openapi::api::core::v1::Service;
/// # use kube::client::scope::{Cluster, Namespace};
/// # use kube::{ResourceExt, api::ListParams};
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let lp = ListParams::default();
/// // List at Cluster level for Pod resource:
/// for pod in client.list::<Pod>(&lp, &Cluster).await? {
/// println!("Found pod {} in {}", pod.name_any(), pod.namespace().unwrap());
/// }
/// // Namespaced Get for Service resource:
/// let svc = client.get::<Service>("kubernetes", &Namespace::from("default")).await?;
/// assert_eq!(svc.name_unchecked(), "kubernetes");
/// # Ok(())
/// # }
/// ```
impl Client {
/// Get a single instance of a `Resource` implementing type `K` at the specified scope.
///
/// ```no_run
/// # use k8s_openapi::api::rbac::v1::ClusterRole;
/// # use k8s_openapi::api::core::v1::Service;
/// # use kube::client::scope::{Cluster, Namespace};
/// # use kube::{ResourceExt, api::GetParams};
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let cr = client.get::<ClusterRole>("cluster-admin", &Cluster).await?;
/// assert_eq!(cr.name_unchecked(), "cluster-admin");
/// let svc = client.get::<Service>("kubernetes", &Namespace::from("default")).await?;
/// assert_eq!(svc.name_unchecked(), "kubernetes");
/// # Ok(())
/// # }
/// ```
pub async fn get<K>(&self, name: &str, scope: &impl ObjectUrl<K>) -> Result<K>
where
K: Resource + Serialize + DeserializeOwned + Clone + Debug,
<K as Resource>::DynamicType: Default,
{
let mut req = Request::new(scope.url_path())
.get(name, &GetParams::default())
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("get");
self.request::<K>(req).await
}

/// List instances of a `Resource` implementing type `K` at the specified scope.
///
/// ```no_run
/// # use k8s_openapi::api::core::v1::Pod;
/// # use k8s_openapi::api::core::v1::Service;
/// # use kube::client::scope::{Cluster, Namespace};
/// # use kube::{ResourceExt, api::ListParams};
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let lp = ListParams::default();
/// for pod in client.list::<Pod>(&lp, &Cluster).await? {
/// println!("Found pod {} in {}", pod.name_any(), pod.namespace().unwrap());
/// }
/// for svc in client.list::<Service>(&lp, &Namespace::from("default")).await? {
/// println!("Found service {}", svc.name_any());
/// }
/// # Ok(())
/// # }
/// ```
pub async fn list<K>(&self, lp: &ListParams, scope: &impl CollectionUrl<K>) -> Result<ObjectList<K>>
where
K: Resource + Serialize + DeserializeOwned + Clone + Debug,
<K as Resource>::DynamicType: Default,
{
let mut req = Request::new(scope.url_path())
.list(lp)
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("list");
self.request::<ObjectList<K>>(req).await
}
}

#[cfg(test)]
mod test {
use super::{
scope::{Cluster, Namespace},
Client, ListParams,
};
use kube_core::ResourceExt;

#[tokio::test]
#[ignore = "needs cluster (will list/get namespaces, pods, jobs, svcs, clusterroles)"]
async fn client_ext_list_get_pods_svcs() -> Result<(), Box<dyn std::error::Error>> {
use k8s_openapi::api::{
batch::v1::Job,
core::v1::{Namespace as k8sNs, Pod, Service},
rbac::v1::ClusterRole,
};

let client = Client::try_default().await?;
let lp = ListParams::default();
// cluster-scoped list
for ns in client.list::<k8sNs>(&lp, &Cluster).await? {
// namespaced list
for p in client.list::<Pod>(&lp, &Namespace::try_from(&ns)?).await? {
println!("Found pod {} in {}", p.name_any(), ns.name_any());
}
}
// across-namespace list
for j in client.list::<Job>(&lp, &Cluster).await? {
println!("Found job {} in {}", j.name_any(), j.namespace().unwrap());
}
// namespaced get
let default: Namespace = "default".into();
let svc = client.get::<Service>("kubernetes", &default).await?;
assert_eq!(svc.name_unchecked(), "kubernetes");
// global get
let ca = client.get::<ClusterRole>("cluster-admin", &Cluster).await?;
assert_eq!(ca.name_unchecked(), "cluster-admin");

Ok(())
}
}
Loading

0 comments on commit d83bbbe

Please sign in to comment.