Skip to content

Commit

Permalink
Wtf am I doing?
Browse files Browse the repository at this point in the history
Enter imposter syndrome
  • Loading branch information
rakshith-ravi committed Jan 31, 2024
1 parent d449353 commit f816f83
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 38 deletions.
15 changes: 8 additions & 7 deletions kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ home = { version = "0.5.4", optional = true }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.68"
serde_yaml = { version = "0.9.19", optional = true }
http = "0.2.5"
http-body = { version = "0.4.2", optional = true }
http = "1.0.0"
http-body = { version = "1.0.0", optional = true }
either = { version = "1.6.1", optional = true }
thiserror = "1.0.29"
futures = { version = "0.3.17", optional = true }
Expand All @@ -58,19 +58,20 @@ tokio = { version = "1.14.0", features = ["time", "signal", "sync"], optional =
kube-core = { path = "../kube-core", version = "=0.88.1" }
jsonpath-rust = { version = "0.4.0", optional = true }
tokio-util = { version = "0.7.0", optional = true, features = ["io", "codec"] }
hyper = { version = "0.14.13", optional = true, features = ["client", "http1", "stream", "tcp"] }
hyper = { version = "1.0.1", optional = true, features = ["client", "http1"] }
hyper-rustls = { version = "0.24.0", optional = true }
hyper-socks2 = { version = "0.8.0", optional = true, default-features = false }
tokio-tungstenite = { version = "0.20.0", optional = true }
tokio-tungstenite = { version = "0.21.0", optional = true }
tower = { version = "0.4.13", optional = true, features = ["buffer", "filter", "util"] }
tower-http = { version = "0.4.0", optional = true, features = ["auth", "map-response-body", "trace"] }
hyper-timeout = {version = "0.4.1", optional = true }
tower-http = { version = "0.5.0", optional = true, features = ["auth", "map-response-body", "trace"] }
hyper-timeout = {version = "0.5.0", optional = true }
tame-oauth = { version = "0.9.1", features = ["gcp"], optional = true }
pin-project = { version = "1.0.4", optional = true }
rand = { version = "0.8.3", optional = true }
secrecy = { version = "0.8.0", features = ["alloc", "serde"] }
tracing = { version = "0.1.36", features = ["log"], optional = true }
hyper-openssl = { version = "0.9.2", optional = true }
hyper-openssl = { version = "0.10.1", optional = true }
http-body-util = {version = "0.1.0" }
form_urlencoded = { version = "1.2.0", optional = true }

[dependencies.k8s-openapi]
Expand Down
6 changes: 4 additions & 2 deletions kube-client/src/client/auth/oauth.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use bytes::Bytes;
use http_body_util::Full;
use tame_oauth::{
gcp::{TokenOrRequest, TokenProvider, TokenProviderWrapper},
Token,
Expand Down Expand Up @@ -123,12 +125,12 @@ impl Gcp {
let client = hyper::Client::builder().build::<_, hyper::Body>(https);

Check failure on line 125 in kube-client/src/client/auth/oauth.rs

View workflow job for this annotation

GitHub Actions / msrv

failed to resolve: could not find `Client` in `hyper`

Check failure on line 125 in kube-client/src/client/auth/oauth.rs

View workflow job for this annotation

GitHub Actions / msrv

cannot find type `Body` in crate `hyper`

let res = client
.request(request.map(hyper::Body::from))
.request(request.map(Full::<Bytes>::new))
.await
.map_err(Error::RequestToken)?;
// Convert response body to `Vec<u8>` for parsing.
let (parts, body) = res.into_parts();
let bytes = hyper::body::to_bytes(body).await.map_err(Error::ConcatBuffers)?;
let bytes = body.await; // TODO figure this out after the client stuff is figured out
let response = http::Response::from_parts(parts, bytes.to_vec());
match self.provider.parse_token_response(scope_hash, response) {
Ok(token) => Ok(token),
Expand Down
6 changes: 4 additions & 2 deletions kube-client/src/client/auth/oidc.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::HashMap;

use bytes::Bytes;
use chrono::{Duration, TimeZone, Utc};
use form_urlencoded::Serializer;
use http::{
header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE},
Method, Version,
};
use http_body_util::Full;
use hyper::{body, client::HttpConnector, http::Uri, Client, Request};

Check failure on line 11 in kube-client/src/client/auth/oidc.rs

View workflow job for this annotation

GitHub Actions / msrv

unresolved imports `hyper::client::HttpConnector`, `hyper::Client`

Check warning on line 11 in kube-client/src/client/auth/oidc.rs

View workflow job for this annotation

GitHub Actions / msrv

unused import: `body`
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Deserializer};
Expand Down Expand Up @@ -328,7 +330,7 @@ impl Refresher {
let response = self.https_client.get(discovery).await?;

if response.status().is_success() {
let body = body::to_bytes(response.into_body()).await?;
let body = Full::<Bytes>::new(response.into_body());
let metadata = serde_json::from_slice::<Metadata>(body.as_ref())
.map_err(errors::RefreshError::InvalidMetadata)?;

Expand Down Expand Up @@ -416,7 +418,7 @@ impl Refresher {
return Err(errors::RefreshError::RequestFailed(response.status()));
}

let body = body::to_bytes(response.into_body()).await?;
let body = Full::<Bytes>::new(response.into_body());
let token_response = serde_json::from_slice::<TokenResponse>(body.as_ref())
.map_err(errors::RefreshError::InvalidTokenResponse)?;

Expand Down
17 changes: 9 additions & 8 deletions kube-client/src/client/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bytes::Bytes;
use http::{header::HeaderMap, Request, Response};
use http_body_util::Full;
use hyper::{
self,
client::{connect::Connection, HttpConnector},

Check failure on line 6 in kube-client/src/client/builder.rs

View workflow job for this annotation

GitHub Actions / msrv

unresolved imports `hyper::client::connect`, `hyper::client::HttpConnector`
Expand All @@ -18,7 +19,7 @@ use crate::{client::ConfigExt, Client, Config, Error, Result};

/// HTTP body of a dynamic backing type.
///
/// The suggested implementation type is [`hyper::Body`].
/// The suggested implementation type is [`Full<Bytes>`].
pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;

/// Builder for [`Client`] instances with customized [tower](`Service`) middleware.
Expand All @@ -34,7 +35,7 @@ impl<Svc> ClientBuilder<Svc> {
/// which provides a default stack as a starting point.
pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
where
Svc: Service<Request<hyper::Body>>,
Svc: Service<Request<Full<Bytes>>>,
{
Self {
service,
Expand All @@ -57,7 +58,7 @@ impl<Svc> ClientBuilder<Svc> {
/// Build a [`Client`] instance with the current [`Service`] stack.
pub fn build<B>(self) -> Client
where
Svc: Service<Request<hyper::Body>, Response = Response<B>> + Send + 'static,
Svc: Service<Request<Full<Bytes>>, Response = Response<B>> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
Expand All @@ -67,7 +68,7 @@ impl<Svc> ClientBuilder<Svc> {
}
}

pub type GenericService = BoxService<Request<hyper::Body>, Response<Box<DynBody>>, BoxError>;
pub type GenericService = BoxService<Request<Full<Bytes>>, Response<Box<DynBody>>, BoxError>;

impl TryFrom<Config> for ClientBuilder<GenericService> {
type Error = Error;
Expand Down Expand Up @@ -104,7 +105,7 @@ where
let default_ns = config.default_namespace.clone();
let auth_layer = config.auth_layer()?;

let client: hyper::Client<_, hyper::Body> = {
let client: hyper::Client<_, Full<Bytes>> = {

Check failure on line 108 in kube-client/src/client/builder.rs

View workflow job for this annotation

GitHub Actions / msrv

cannot find type `Client` in crate `hyper`
// Current TLS feature precedence when more than one are set:
// 1. rustls-tls
// 2. openssl-tls
Expand Down Expand Up @@ -145,7 +146,7 @@ where
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
TraceLayer::new_for_http()
.make_span_with(|req: &Request<hyper::Body>| {
.make_span_with(|req: &Request<Full<Bytes>>| {
tracing::debug_span!(
"HTTP",
http.method = %req.method(),
Expand All @@ -156,10 +157,10 @@ where
otel.status_code = tracing::field::Empty,
)
})
.on_request(|_req: &Request<hyper::Body>, _span: &Span| {
.on_request(|_req: &Request<Full<Bytes>>, _span: &Span| {
tracing::debug!("requesting");
})
.on_response(|res: &Response<hyper::Body>, _latency: Duration, span: &Span| {
.on_response(|res: &Response<Full<Bytes>>, _latency: Duration, span: &Span| {
let status = res.status();
span.record("http.status_code", status.as_u16());
if status.is_client_error() || status.is_server_error() {
Expand Down
23 changes: 11 additions & 12 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
//!
//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
//! retrieve the resources served by the kubernetes API.
use bytes::Bytes;
use either::{Either, Left, Right};
use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response, StatusCode};
use hyper::Body;
use http_body_util::{Full, StreamBody};
use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
pub use kube_core::response::Status;
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -65,7 +66,7 @@ pub use builder::{ClientBuilder, DynBody};
pub struct Client {
// - `Buffer` for cheap clone
// - `BoxService` for dynamic response future type
inner: Buffer<BoxService<Request<Body>, Response<Body>, BoxError>, Request<Body>>,
inner: Buffer<BoxService<Request<Full<Bytes>>, Response<Full<Bytes>>, BoxError>, Request<Full<Bytes>>>,
default_ns: String,
}

Expand Down Expand Up @@ -99,15 +100,15 @@ impl Client {
/// ```
pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
where
S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
S: Service<Request<B>, Response = Response<B>> + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
T: Into<String>,
{
// Transform response body to `hyper::Body` and use type erased error to avoid type parameters.
let service = MapResponseBodyLayer::new(|b: B| Body::wrap_stream(b.into_stream()))
let service = MapResponseBodyLayer::new(|b: B| StreamBody::new(b.into_stream()))
.layer(service)
.map_err(|e| e.into());
Self {
Expand Down Expand Up @@ -141,7 +142,7 @@ impl Client {
/// Perform a raw HTTP request against the API and return the raw response back.
/// This method can be used to get raw access to the API which may be used to, for example,
/// create a proxy server or application-level gateway between localhost and the API server.
pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
pub async fn send(&self, request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>> {
let mut svc = self.inner.clone();
let res = svc
.ready()
Expand Down Expand Up @@ -195,7 +196,7 @@ impl Client {
HeaderValue::from_static(upgrade::WS_PROTOCOL),
);

let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
let res = self.send(Request::from_parts(parts, Full::new(body))).await?;
upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
match hyper::upgrade::on(res).await {
Ok(upgraded) => {
Expand Down Expand Up @@ -225,12 +226,10 @@ impl Client {
/// Perform a raw HTTP request against the API and get back the response
/// as a string
pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
let res = self.send(request.map(Body::from)).await?;
let res = self.send(request.map(Full::new)).await?;
let status = res.status();
// trace!("Status = {:?} for {}", status, res.url());
let body_bytes = hyper::body::to_bytes(res.into_body())
.await
.map_err(Error::HyperError)?;
let body_bytes = Full::from(res.into_body()).await.map_err(Error::HyperError)?;
let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
handle_api_errors(&text, status)?;

Expand All @@ -242,7 +241,7 @@ impl Client {
/// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt)
/// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead> {
let res = self.send(request.map(Body::from)).await?;
let res = self.send(request.map(Full::new)).await?;
// Map the error, since we want to convert this into an `AsyncBufReader` using
// `into_async_read` which specifies `std::io::Error` as the stream's error type.
let body = res
Expand Down Expand Up @@ -282,7 +281,7 @@ impl Client {
where
T: Clone + DeserializeOwned,
{
let res = self.send(request.map(Body::from)).await?;
let res = self.send(request.map(Full::new)).await?;
// trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
tracing::trace!("headers: {:?}", res.headers());

Expand Down
5 changes: 3 additions & 2 deletions kube-client/src/client/upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bytes::Bytes;
use http::{self, Response, StatusCode};
use hyper::Body;
use http_body_util::StreamBody;
use thiserror::Error;
use tokio_tungstenite::tungstenite as ws;

Expand Down Expand Up @@ -41,7 +42,7 @@ pub enum UpgradeConnectionError {

// Verify upgrade response according to RFC6455.
// Based on `tungstenite` and added subprotocol verification.
pub fn verify_response(res: &Response<Body>, key: &str) -> Result<(), UpgradeConnectionError> {
pub fn verify_response(res: &Response<StreamBody<Bytes>>, key: &str) -> Result<(), UpgradeConnectionError> {
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(UpgradeConnectionError::ProtocolSwitch(res.status()));
}
Expand Down
2 changes: 1 addition & 1 deletion kube-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.68"
thiserror = "1.0.29"
form_urlencoded = "1.0.1"
http = "0.2.5"
http = "1.0.0"
json-patch = { version = "1.0.0", optional = true }
once_cell = "1.8.0"
chrono = { version = "0.4.19", default-features = false, features = ["clock"] }
Expand Down
4 changes: 2 additions & 2 deletions kube-derive/tests/ui/fail_with_suggestion.stderr
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
error: Unknown field: `shortnames`. Did you mean `shortname`?
--> $DIR/fail_with_suggestion.rs:6:58
--> tests/ui/fail_with_suggestion.rs:6:58
|
6 | #[kube(group = "clux.dev", version = "v1", kind = "Foo", shortnames = "foo")]
| ^^^^^^^^^^
| ^^^^^^^^^^^^^^^^^^
4 changes: 2 additions & 2 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ futures = "0.3.17"
serde_json = "1.0.68"
serde = { version = "1.0.130", features = ["derive"] }
schemars = "0.8.6"
hyper = "0.14.27"
http = "0.2.9"
hyper = "1.0.1"
http = "1.0.0"
tower-test = "0.4.0"
anyhow = "1.0.71"

Expand Down

0 comments on commit f816f83

Please sign in to comment.