Skip to content

Commit

Permalink
Use bytes::Bytes as the HTTP request body in HttpClient.
Browse files Browse the repository at this point in the history
This will allow pooling of buffers on caller side compared to usage of Vec<u8>.
  • Loading branch information
mstyura committed Jan 2, 2025
1 parent 42b4f2f commit e0156e6
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 16 deletions.
1 change: 1 addition & 0 deletions opentelemetry-http/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Bump msrv to 1.75.0.
- Add "internal-logs" feature flag (enabled by default), and emit internal logs.
- Add `HttpClient::send_bytes` with `bytes::Bytes` request payload and deprecate old `HttpClient::send` function.

## 0.27.0

Expand Down
21 changes: 16 additions & 5 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,24 @@ pub type HttpError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// users to bring their choice of HTTP client.
#[async_trait]
pub trait HttpClient: Debug + Send + Sync {
/// Send the specified HTTP request
/// Send the specified HTTP request with `Vec<u8>` payload
///
/// Returns the HTTP response including the status code and body.
///
/// Returns an error if it can't connect to the server or the request could not be completed,
/// e.g. because of a timeout, infinite redirects, or a loss of connection.
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError>;
#[deprecated(note = "Use `send_bytes` with `Bytes` payload instead.")]
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
self.send_bytes(request.map(Into::into)).await
}

Check warning on line 67 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L65-L67

Added lines #L65 - L67 were not covered by tests

/// Send the specified HTTP request with `Bytes` payload.
///
/// Returns the HTTP response including the status code and body.
///
/// Returns an error if it can't connect to the server or the request could not be completed,
/// e.g. because of a timeout, infinite redirects, or a loss of connection.
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError>;
}

#[cfg(feature = "reqwest")]
Expand All @@ -72,7 +83,7 @@ mod reqwest {

#[async_trait]
impl HttpClient for reqwest::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {

Check warning on line 86 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L86

Added line #L86 was not covered by tests
otel_debug!(name: "ReqwestClient.Send");
let request = request.try_into()?;
let mut response = self.execute(request).await?.error_for_status()?;
Expand All @@ -89,7 +100,7 @@ mod reqwest {
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl HttpClient for reqwest::blocking::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {

Check warning on line 103 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L103

Added line #L103 was not covered by tests
otel_debug!(name: "ReqwestBlockingClient.Send");
let request = request.try_into()?;
let mut response = self.execute(request)?.error_for_status()?;
Expand Down Expand Up @@ -159,7 +170,7 @@ pub mod hyper {

#[async_trait]
impl HttpClient for HyperClient {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
async fn send_bytes(&self, request: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {

Check warning on line 173 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L173

Added line #L173 was not covered by tests
otel_debug!(name: "HyperClient.Send");
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, Body(Full::from(body)));
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl LogExporter for OtlpHttpClient {
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.body(body.into())

Check warning on line 31 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L31

Added line #L31 was not covered by tests
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
Expand All @@ -37,7 +37,7 @@ impl LogExporter for OtlpHttpClient {

let request_uri = request.uri().to_string();
otel_debug!(name: "HttpLogsClient.CallingExport");
let response = client.send(request).await?;
let response = client.send_bytes(request).await?;

Check warning on line 40 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L40

Added line #L40 was not covered by tests

if !response.status().is_success() {
let error = format!(
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl MetricsClient for OtlpHttpClient {
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.body(body.into())

Check warning on line 30 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L30

Added line #L30 was not covered by tests
.map_err(|e| crate::Error::RequestFailed(Box::new(e)))?;

for (k, v) in &self.headers {
Expand All @@ -36,7 +36,7 @@ impl MetricsClient for OtlpHttpClient {

otel_debug!(name: "HttpMetricsClient.CallingExport");
client
.send(request)
.send_bytes(request)

Check warning on line 39 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L39

Added line #L39 was not covered by tests
.await
.map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?;

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl SpanExporter for OtlpHttpClient {
.method(Method::POST)
.uri(&self.collector_endpoint)
.header(CONTENT_TYPE, content_type)
.body(body)
.body(body.into())

Check warning on line 33 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L33

Added line #L33 was not covered by tests
{
Ok(req) => req,
Err(e) => {
Expand All @@ -48,7 +48,7 @@ impl SpanExporter for OtlpHttpClient {
Box::pin(async move {
let request_uri = request.uri().to_string();
otel_debug!(name: "HttpTracesClient.CallingExport");
let response = client.send(request).await?;
let response = client.send_bytes(request).await?;

Check warning on line 51 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L51

Added line #L51 was not covered by tests

if !response.status().is_success() {
let error = format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl JaegerRemoteSampler {
{
let request = http::Request::get(endpoint)
.header("Content-Type", "application/json")
.body(Vec::new())
.body(Default::default())

Check warning on line 231 in opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs#L231

Added line #L231 was not covered by tests
.unwrap();

let resp = client
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-zipkin/src/exporter/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ impl JsonV2Client {
.method(Method::POST)
.uri(self.collector_endpoint.clone())
.header(CONTENT_TYPE, "application/json")
.body(serde_json::to_vec(&spans).unwrap_or_default())
.body(serde_json::to_vec(&spans).unwrap_or_default().into())

Check warning on line 44 in opentelemetry-zipkin/src/exporter/uploader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-zipkin/src/exporter/uploader.rs#L44

Added line #L44 was not covered by tests
.map_err::<Error, _>(Into::into)?;
let _ = self.client.send(req).await?.error_for_status()?;
let _ = self.client.send_bytes(req).await?.error_for_status()?;

Check warning on line 46 in opentelemetry-zipkin/src/exporter/uploader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-zipkin/src/exporter/uploader.rs#L46

Added line #L46 was not covered by tests
Ok(())
}
}
4 changes: 2 additions & 2 deletions opentelemetry-zipkin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@
//!
//! #[async_trait]
//! impl HttpClient for HyperClient {
//! async fn send(&self, req: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
//! async fn send_bytes(&self, req: Request<Bytes>) -> Result<Response<Bytes>, HttpError> {
//! let resp = self
//! .0
//! .request(req.map(|v| Full::new(Bytes::from(v))))
//! .request(req.map(|v| Full::new(v)))
//! .await?;
//!
//! let response = Response::builder()
Expand Down

0 comments on commit e0156e6

Please sign in to comment.