From e0156e669a3ad3a3862a3d1b2e1b8f614ae6a686 Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Fri, 27 Dec 2024 19:12:15 +0100 Subject: [PATCH] Use bytes::Bytes as the HTTP request body in HttpClient. This will allow pooling of buffers on caller side compared to usage of Vec. --- opentelemetry-http/CHANGELOG.md | 1 + opentelemetry-http/src/lib.rs | 21 ++++++++++++++----- opentelemetry-otlp/src/exporter/http/logs.rs | 4 ++-- .../src/exporter/http/metrics.rs | 4 ++-- opentelemetry-otlp/src/exporter/http/trace.rs | 4 ++-- .../trace/sampler/jaeger_remote/sampler.rs | 2 +- opentelemetry-zipkin/src/exporter/uploader.rs | 4 ++-- opentelemetry-zipkin/src/lib.rs | 4 ++-- 8 files changed, 28 insertions(+), 16 deletions(-) diff --git a/opentelemetry-http/CHANGELOG.md b/opentelemetry-http/CHANGELOG.md index 18f06ca63d..8a23c0bf00 100644 --- a/opentelemetry-http/CHANGELOG.md +++ b/opentelemetry-http/CHANGELOG.md @@ -4,6 +4,7 @@ - Bump msrv to 1.75.0. - Add "internal-logs" feature flag (enabled by default), and emit internal logs. +- Add `HttpClient::send_bytes` with `bytes::Bytes` request payload and deprecate old `HttpClient::send` function. ## 0.27.0 diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index f272d8d4c5..301979f85f 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -55,13 +55,24 @@ pub type HttpError = Box; /// users to bring their choice of HTTP client. #[async_trait] pub trait HttpClient: Debug + Send + Sync { - /// Send the specified HTTP request + /// Send the specified HTTP request with `Vec` payload /// /// Returns the HTTP response including the status code and body. /// /// Returns an error if it can't connect to the server or the request could not be completed, /// e.g. because of a timeout, infinite redirects, or a loss of connection. - async fn send(&self, request: Request>) -> Result, HttpError>; + #[deprecated(note = "Use `send_bytes` with `Bytes` payload instead.")] + async fn send(&self, request: Request>) -> Result, HttpError> { + self.send_bytes(request.map(Into::into)).await + } + + /// Send the specified HTTP request with `Bytes` payload. + /// + /// Returns the HTTP response including the status code and body. + /// + /// Returns an error if it can't connect to the server or the request could not be completed, + /// e.g. because of a timeout, infinite redirects, or a loss of connection. + async fn send_bytes(&self, request: Request) -> Result, HttpError>; } #[cfg(feature = "reqwest")] @@ -72,7 +83,7 @@ mod reqwest { #[async_trait] impl HttpClient for reqwest::Client { - async fn send(&self, request: Request>) -> Result, HttpError> { + async fn send_bytes(&self, request: Request) -> Result, HttpError> { otel_debug!(name: "ReqwestClient.Send"); let request = request.try_into()?; let mut response = self.execute(request).await?.error_for_status()?; @@ -89,7 +100,7 @@ mod reqwest { #[cfg(not(target_arch = "wasm32"))] #[async_trait] impl HttpClient for reqwest::blocking::Client { - async fn send(&self, request: Request>) -> Result, HttpError> { + async fn send_bytes(&self, request: Request) -> Result, HttpError> { otel_debug!(name: "ReqwestBlockingClient.Send"); let request = request.try_into()?; let mut response = self.execute(request)?.error_for_status()?; @@ -159,7 +170,7 @@ pub mod hyper { #[async_trait] impl HttpClient for HyperClient { - async fn send(&self, request: Request>) -> Result, HttpError> { + async fn send_bytes(&self, request: Request) -> Result, HttpError> { otel_debug!(name: "HyperClient.Send"); let (parts, body) = request.into_parts(); let mut request = Request::from_parts(parts, Body(Full::from(body))); diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 9d00602eed..eaf7f5f63c 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -28,7 +28,7 @@ impl LogExporter for OtlpHttpClient { .method(Method::POST) .uri(&self.collector_endpoint) .header(CONTENT_TYPE, content_type) - .body(body) + .body(body.into()) .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; for (k, v) in &self.headers { @@ -37,7 +37,7 @@ impl LogExporter for OtlpHttpClient { let request_uri = request.uri().to_string(); otel_debug!(name: "HttpLogsClient.CallingExport"); - let response = client.send(request).await?; + let response = client.send_bytes(request).await?; if !response.status().is_success() { let error = format!( diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index 7b96a7c5ce..aac771a54a 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -27,7 +27,7 @@ impl MetricsClient for OtlpHttpClient { .method(Method::POST) .uri(&self.collector_endpoint) .header(CONTENT_TYPE, content_type) - .body(body) + .body(body.into()) .map_err(|e| crate::Error::RequestFailed(Box::new(e)))?; for (k, v) in &self.headers { @@ -36,7 +36,7 @@ impl MetricsClient for OtlpHttpClient { otel_debug!(name: "HttpMetricsClient.CallingExport"); client - .send(request) + .send_bytes(request) .await .map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?; diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index d188dc8911..4ec74581a9 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -30,7 +30,7 @@ impl SpanExporter for OtlpHttpClient { .method(Method::POST) .uri(&self.collector_endpoint) .header(CONTENT_TYPE, content_type) - .body(body) + .body(body.into()) { Ok(req) => req, Err(e) => { @@ -48,7 +48,7 @@ impl SpanExporter for OtlpHttpClient { Box::pin(async move { let request_uri = request.uri().to_string(); otel_debug!(name: "HttpTracesClient.CallingExport"); - let response = client.send(request).await?; + let response = client.send_bytes(request).await?; if !response.status().is_success() { let error = format!( diff --git a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs index 1b7909f760..815abc828e 100644 --- a/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs +++ b/opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs @@ -228,7 +228,7 @@ impl JaegerRemoteSampler { { let request = http::Request::get(endpoint) .header("Content-Type", "application/json") - .body(Vec::new()) + .body(Default::default()) .unwrap(); let resp = client diff --git a/opentelemetry-zipkin/src/exporter/uploader.rs b/opentelemetry-zipkin/src/exporter/uploader.rs index 84f0581dc5..56c61f2137 100644 --- a/opentelemetry-zipkin/src/exporter/uploader.rs +++ b/opentelemetry-zipkin/src/exporter/uploader.rs @@ -41,9 +41,9 @@ impl JsonV2Client { .method(Method::POST) .uri(self.collector_endpoint.clone()) .header(CONTENT_TYPE, "application/json") - .body(serde_json::to_vec(&spans).unwrap_or_default()) + .body(serde_json::to_vec(&spans).unwrap_or_default().into()) .map_err::(Into::into)?; - let _ = self.client.send(req).await?.error_for_status()?; + let _ = self.client.send_bytes(req).await?.error_for_status()?; Ok(()) } } diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index 8d414db8a8..1eea3a232a 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -109,10 +109,10 @@ //! //! #[async_trait] //! impl HttpClient for HyperClient { -//! async fn send(&self, req: Request>) -> Result, HttpError> { +//! async fn send_bytes(&self, req: Request) -> Result, HttpError> { //! let resp = self //! .0 -//! .request(req.map(|v| Full::new(Bytes::from(v)))) +//! .request(req.map(|v| Full::new(v))) //! .await?; //! //! let response = Response::builder()