Skip to content

Commit

Permalink
chore: modify LogExporter and SpanExporter interfaces to support retu…
Browse files Browse the repository at this point in the history
…rning failure
  • Loading branch information
scottgerring committed Dec 5, 2024
1 parent 957659f commit 2df2353
Show file tree
Hide file tree
Showing 16 changed files with 67 additions and 32 deletions.
3 changes: 3 additions & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## vNext

- `OtlpHttpClient.shutdown` `TonicLogsClient.shutdown`, and `TonicTracesClient.shutdown` now explicitly return a result. The
semantics of the method have not changed, but you will have a new lint encouraging you to consume these results.

## 0.27.0

Released 2024-Nov-11
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ impl LogExporter for OtlpHttpClient {
Ok(())
}

fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
fn shutdown(&mut self) -> LogResult<()> {
let _ = self.client.lock()?.take();
Ok(())

Check warning on line 52 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L50-L52

Added lines #L50 - L52 were not covered by tests
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
Expand Down
7 changes: 4 additions & 3 deletions opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use futures_core::future::BoxFuture;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::trace::TraceError;
use opentelemetry::trace::{TraceError, TraceResult};
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};

use super::OtlpHttpClient;
Expand Down Expand Up @@ -63,8 +63,9 @@ impl SpanExporter for OtlpHttpClient {
})
}

fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
fn shutdown(&mut self) -> TraceResult<()> {
let _ = self.client.lock()?.take();
Ok(())

Check warning on line 68 in opentelemetry-otlp/src/exporter/http/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/trace.rs#L66-L68

Added lines #L66 - L68 were not covered by tests
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ impl LogExporter for TonicLogsClient {
Ok(())
}

fn shutdown(&mut self) {
fn shutdown(&mut self) -> LogResult<()> {

Check warning on line 84 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L84

Added line #L84 was not covered by tests
let _ = self.inner.take();
Ok(())

Check warning on line 86 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L86

Added line #L86 was not covered by tests
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::fmt;

use futures_core::future::BoxFuture;
use opentelemetry::trace::TraceError;
use opentelemetry::trace::{TraceError, TraceResult};
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
Expand Down Expand Up @@ -88,8 +88,9 @@ impl SpanExporter for TonicTracesClient {
})
}

fn shutdown(&mut self) {
fn shutdown(&mut self) -> TraceResult<()> {

Check warning on line 91 in opentelemetry-otlp/src/exporter/tonic/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/trace.rs#L91

Added line #L91 was not covered by tests
let _ = self.inner.take();
Ok(())

Check warning on line 93 in opentelemetry-otlp/src/exporter/tonic/trace.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/trace.rs#L93

Added line #L93 was not covered by tests
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
[#2338](https://github.com/open-telemetry/opentelemetry-rust/pull/2338)
- `ResourceDetector.detect()` no longer supports timeout option.
- `opentelemetry::global::shutdown_tracer_provider()` Removed from the API, should now use `tracer_provider.shutdown()` see [#2369](https://github.com/open-telemetry/opentelemetry-rust/pull/2369) for a migration example. "Tracer provider" is cheaply cloneable, so users are encouraged to set a clone of it as the global (ex: `global::set_tracer_provider(provider.clone()))`, so that instrumentations and other components can obtain tracers from `global::tracer()`. The tracer_provider must be kept around to call shutdown on it at the end of application (ex: `tracer_provider.shutdown()`)
- The trait functions `LogExporter.shutdown` and `TraceExporter.shutdown` now explicitly return a result. The
semantics of the method have not changed, but you will have a new lint encouraging you to consume these results.

## 0.27.1

Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ pub trait LogExporter: Send + Sync + Debug {
/// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed.
///
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>;

/// Shuts down the exporter.
fn shutdown(&mut self) {}
fn shutdown(&mut self) -> LogResult<()> {
Ok(())
}

Check warning on line 89 in opentelemetry-sdk/src/export/logs/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/export/logs/mod.rs#L87-L89

Added lines #L87 - L89 were not covered by tests

#[cfg(feature = "spec_unstable_logs_enabled")]
/// Chek if logs are enabled.
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
Expand Down
7 changes: 6 additions & 1 deletion opentelemetry-sdk/src/export/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use std::time::SystemTime;
/// Describes the result of an export.
pub type ExportResult = Result<(), TraceError>;

/// Describes the results of other operations on the trace API.
pub type TraceResult<T> = Result<T, TraceError>;

/// `SpanExporter` defines the interface that protocol-specific exporters must
/// implement so that they can be plugged into OpenTelemetry SDK and support
/// sending of telemetry data.
Expand Down Expand Up @@ -43,7 +46,9 @@ pub trait SpanExporter: Send + Sync + Debug {
/// flush the data and the destination is unavailable). SDK authors
/// can decide if they want to make the shutdown timeout
/// configurable.
fn shutdown(&mut self) {}
fn shutdown(&mut self) -> TraceResult<()> {
Ok(())
}

/// This is a hint to ensure that the export of any Spans the exporter
/// has received prior to the call to this function SHOULD be completed
Expand Down
14 changes: 11 additions & 3 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
exporter.shutdown()?;
Ok(())
} else {
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))
Expand Down Expand Up @@ -299,7 +299,13 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
)
.await;

exporter.shutdown();
if let Err(e) = exporter.shutdown() {
otel_warn!(

Check warning on line 303 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L303

Added line #L303 was not covered by tests
name: "BatchLogProcessor.Shutdown.Failed",
message = "failed shutting down exporter cleanly",
error = format!("{:?}", e)

Check warning on line 306 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L306

Added line #L306 was not covered by tests
);
};

if let Err(send_error) = ch.send(result) {
otel_debug!(
Expand Down Expand Up @@ -590,7 +596,9 @@ mod tests {
Ok(())
}

fn shutdown(&mut self) {}
fn shutdown(&mut self) -> LogResult<()> {
Ok(())
}

fn set_resource(&mut self, resource: &Resource) {
self.resource
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,11 @@ impl LogExporter for InMemoryLogExporter {
Ok(())
}

fn shutdown(&mut self) {
fn shutdown(&mut self) -> LogResult<()> {
if self.should_reset_on_shutdown {
self.reset();
}
Ok(())
}

fn set_resource(&mut self, resource: &Resource) {
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ impl SpanExporter for InMemorySpanExporter {
Box::pin(std::future::ready(Ok(())))
}

fn shutdown(&mut self) {
self.reset()
fn shutdown(&mut self) -> TraceResult<()> {
self.reset();
Ok(())
}

fn set_resource(&mut self, resource: &Resource) {
Expand Down
11 changes: 7 additions & 4 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::{
export::trace::{ExportResult, SpanData, SpanExporter},
export::trace::{ExportResult, SpanData, SpanExporter, TraceResult},
trace::{SpanEvents, SpanLinks},
};
use futures_util::future::BoxFuture;
pub use opentelemetry::testing::trace::TestSpan;
use opentelemetry::{
trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
trace::{SpanContext, SpanId, SpanKind, Status, TraceError, TraceFlags, TraceId, TraceState},
InstrumentationScope,
};
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -53,8 +53,11 @@ impl SpanExporter for TokioSpanExporter {
Box::pin(std::future::ready(Ok(())))
}

fn shutdown(&mut self) {
self.tx_shutdown.send(()).unwrap();
fn shutdown(&mut self) -> TraceResult<()> {
self.tx_shutdown
.send(())
.map_err::<TraceError, _>(|err| TraceError::Other(Box::new(err)))?;
Ok(())
}
}

Expand Down
16 changes: 7 additions & 9 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,7 @@ impl SpanProcessor for SimpleSpanProcessor {
}

fn shutdown(&self) -> TraceResult<()> {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other(
"SimpleSpanProcessor mutex poison at shutdown".into(),
))
}
self.exporter.lock()?.shutdown()
}

fn set_resource(&mut self, resource: &Resource) {
Expand Down Expand Up @@ -432,7 +425,12 @@ impl<R: RuntimeChannel> BatchSpanProcessorInternal<R> {
// Stream has terminated or processor is shutdown, return to finish execution.
BatchMessage::Shutdown(ch) => {
self.flush(Some(ch)).await;
self.exporter.shutdown();
if let Err(e) = self.exporter.shutdown() {
otel_warn!(

Check warning on line 429 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L429

Added line #L429 was not covered by tests
name: "SpanProcessor.Shutdown.Failed",
message = "failed shutting down exporter cleanly",
error = format!("{:?}", e));

Check warning on line 432 in opentelemetry-sdk/src/trace/span_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/trace/span_processor.rs#L432

Added line #L432 was not covered by tests
}
return false;
}
// propagate the resource
Expand Down
4 changes: 4 additions & 0 deletions opentelemetry-stdout/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## vNext

- `LogExporter.shutdown` and `SpanExporter.shutdown` now explicitly return a result. The
semantics of the method have not changed, but you will have a new lint encouraging you to consume these results.


## 0.27.0

Released 2024-Nov-11
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
}
}

fn shutdown(&mut self) {
fn shutdown(&mut self) -> LogResult<()> {

Check warning on line 59 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L59

Added line #L59 was not covered by tests
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
Ok(())
}

Check warning on line 62 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L61-L62

Added lines #L61 - L62 were not covered by tests

fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) {
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-stdout/src/trace/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use core::fmt;
use futures_util::future::BoxFuture;
use opentelemetry::trace::TraceError;
use opentelemetry::trace::{TraceError, TraceResult};
use opentelemetry_sdk::export::{self, trace::ExportResult};
use std::sync::atomic;

Expand Down Expand Up @@ -59,8 +59,9 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
}
}

fn shutdown(&mut self) {
fn shutdown(&mut self) -> TraceResult<()> {
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
Ok(())

Check warning on line 64 in opentelemetry-stdout/src/trace/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/trace/exporter.rs#L64

Added line #L64 was not covered by tests
}

fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) {
Expand Down

0 comments on commit 2df2353

Please sign in to comment.