diff --git a/examples/tracing-grpc/src/client.rs b/examples/tracing-grpc/src/client.rs index 18c42cec6f..2247aa846f 100644 --- a/examples/tracing-grpc/src/client.rs +++ b/examples/tracing-grpc/src/client.rs @@ -1,7 +1,7 @@ use hello_world::greeter_client::GreeterClient; use hello_world::HelloRequest; use opentelemetry::{global, propagation::Injector}; -use opentelemetry_sdk::{propagation::TraceContextPropagator, runtime::Tokio, trace as sdktrace}; +use opentelemetry_sdk::{propagation::TraceContextPropagator, trace as sdktrace}; use opentelemetry_stdout::SpanExporter; use opentelemetry::{ diff --git a/examples/tracing-grpc/src/server.rs b/examples/tracing-grpc/src/server.rs index 3dbb012321..13d10804ce 100644 --- a/examples/tracing-grpc/src/server.rs +++ b/examples/tracing-grpc/src/server.rs @@ -5,9 +5,7 @@ use opentelemetry::{ propagation::Extractor, trace::{Span, SpanKind, Tracer}, }; -use opentelemetry_sdk::{ - propagation::TraceContextPropagator, runtime::Tokio, trace::TracerProvider, -}; +use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider}; use opentelemetry_stdout::SpanExporter; use tonic::{transport::Server, Request, Response, Status}; diff --git a/examples/tracing-jaeger/src/main.rs b/examples/tracing-jaeger/src/main.rs index 7e06d8e921..f28eddec5e 100644 --- a/examples/tracing-jaeger/src/main.rs +++ b/examples/tracing-jaeger/src/main.rs @@ -4,7 +4,7 @@ use opentelemetry::{ KeyValue, }; use opentelemetry_sdk::trace::TracerProvider; -use opentelemetry_sdk::{runtime, Resource}; +use opentelemetry_sdk::Resource; use std::error::Error; diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index 2d0676d5bb..778b1a31d2 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -1,9 +1,8 @@ # Changelog ## vNext - - Bump msrv to 1.75.0. - +- `OtlpHttpClient.shutdown` `TonicLogsClient.shutdown`, and `TonicTracesClient.shutdown` now explicitly return a result. ## 0.27.0 diff --git a/opentelemetry-otlp/src/exporter/http/logs.rs b/opentelemetry-otlp/src/exporter/http/logs.rs index 9d00602eed..d632b8be20 100644 --- a/opentelemetry-otlp/src/exporter/http/logs.rs +++ b/opentelemetry-otlp/src/exporter/http/logs.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; -use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter, ShutdownResult}; use opentelemetry_sdk::logs::{LogError, LogResult}; use super::OtlpHttpClient; @@ -53,8 +53,9 @@ impl LogExporter for OtlpHttpClient { } } - fn shutdown(&mut self) { - let _ = self.client.lock().map(|mut c| c.take()); + fn shutdown(&mut self) -> ShutdownResult { + let _ = self.client.lock()?.take(); + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index d188dc8911..83fc6c2a6c 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use futures_core::future::BoxFuture; use http::{header::CONTENT_TYPE, Method}; -use opentelemetry::{otel_debug, trace::TraceError}; -use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; +use opentelemetry::otel_debug; +use opentelemetry::trace::TraceError; +use opentelemetry_sdk::export::trace::{ExportResult, ShutdownResult, SpanData, SpanExporter}; use super::OtlpHttpClient; @@ -64,8 +65,9 @@ impl SpanExporter for OtlpHttpClient { }) } - fn shutdown(&mut self) { - let _ = self.client.lock().map(|mut c| c.take()); + fn shutdown(&mut self) -> ShutdownResult { + let _ = self.client.lock()?.take(); + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 053331b428..44b171f7b7 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -3,7 +3,7 @@ use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::logs::v1::{ logs_service_client::LogsServiceClient, ExportLogsServiceRequest, }; -use opentelemetry_sdk::export::logs::{LogBatch, LogExporter}; +use opentelemetry_sdk::export::logs::{LogBatch, LogExporter, ShutdownResult}; use opentelemetry_sdk::logs::{LogError, LogResult}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; @@ -92,8 +92,9 @@ impl LogExporter for TonicLogsClient { } } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> ShutdownResult { let _ = self.inner.take(); + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index 998acafad5..0660c9a8e2 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -5,10 +5,9 @@ use opentelemetry::{otel_debug, trace::TraceError}; use opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; -use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; -use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; - use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope; +use opentelemetry_sdk::export::trace::{ExportResult, ShutdownResult, SpanData, SpanExporter}; +use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; use super::BoxInterceptor; @@ -92,8 +91,9 @@ impl SpanExporter for TonicTracesClient { }) } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> ShutdownResult { let _ = self.inner.take(); + Ok(()) } fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-otlp/tests/integration_test/tests/logs.rs b/opentelemetry-otlp/tests/integration_test/tests/logs.rs index 39eb88a1e4..fe2399fdae 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/logs.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/logs.rs @@ -94,7 +94,7 @@ mod logtests { tokio::time::sleep(Duration::from_secs(10)).await; - assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); + let _ = assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); Ok(()) } @@ -122,7 +122,7 @@ mod logtests { } let _ = logger_provider.shutdown(); // tokio::time::sleep(Duration::from_secs(10)).await; - assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); + let _ = assert_logs_results(test_utils::LOGS_FILE, "expected/logs.json"); Ok(()) } diff --git a/opentelemetry-otlp/tests/integration_test/tests/traces.rs b/opentelemetry-otlp/tests/integration_test/tests/traces.rs index 29bc93d77f..92816c0333 100644 --- a/opentelemetry-otlp/tests/integration_test/tests/traces.rs +++ b/opentelemetry-otlp/tests/integration_test/tests/traces.rs @@ -13,7 +13,7 @@ use anyhow::Result; use ctor::dtor; use integration_test_runner::test_utils; use opentelemetry_proto::tonic::trace::v1::TracesData; -use opentelemetry_sdk::{runtime, trace as sdktrace, Resource}; +use opentelemetry_sdk::{trace as sdktrace, Resource}; use std::fs::File; use std::io::Write; use std::os::unix::fs::MetadataExt; diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 140f41ed1f..5094c72c8c 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,11 @@ ## vNext +- If you are an exporter author, the trait functions `LogExporter.shutdown` and `TraceExporter.shutdown` must now return a result. Note that implementing shutdown is optional as the trait provides a default implementation that returns Ok(()). + +- The trait functions `LogExporter.shutdown` and `TraceExporter.shutdown` now explicitly return a result. The + semantics of the method have not changed, but you will have a new lint encouraging you to consume these results. + - *Breaking(Affects custom metric exporter authors only)* `start_time` and `time` is moved from DataPoints to aggregations (Sum, Gauge, Histogram, ExpoHistogram) see [#2377](https://github.com/open-telemetry/opentelemetry-rust/pull/2377) and [#2411](https://github.com/open-telemetry/opentelemetry-rust/pull/2411), to reduce memory. - *Breaking* `start_time` is no longer optional for `Sum` aggregation, see [#2367](https://github.com/open-telemetry/opentelemetry-rust/pull/2367), but is still optional for `Gauge` aggregation see [#2389](https://github.com/open-telemetry/opentelemetry-rust/pull/2389). @@ -14,6 +19,7 @@ [#2338](https://github.com/open-telemetry/opentelemetry-rust/pull/2338) - `ResourceDetector.detect()` no longer supports timeout option. - `opentelemetry::global::shutdown_tracer_provider()` Removed from the API, should now use `tracer_provider.shutdown()` see [#2369](https://github.com/open-telemetry/opentelemetry-rust/pull/2369) for a migration example. "Tracer provider" is cheaply cloneable, so users are encouraged to set a clone of it as the global (ex: `global::set_tracer_provider(provider.clone()))`, so that instrumentations and other components can obtain tracers from `global::tracer()`. The tracer_provider must be kept around to call shutdown on it at the end of application (ex: `tracer_provider.shutdown()`) + - *Feature*: Add `ResourceBuilder` for an easy way to create new `Resource`s - *Breaking*: Remove `Resource::{new,empty,from_detectors,new_with_defaults,from_schema_url,merge,default}` from public api. To create Resources you should only use `Resource::builder()` or `Resource::builder_empty()`. See [#2322](https://github.com/open-telemetry/opentelemetry-rust/pull/2322) for a migration guide. Example Usage: diff --git a/opentelemetry-sdk/src/error.rs b/opentelemetry-sdk/src/error.rs index 115da17b78..c3ab3f5483 100644 --- a/opentelemetry-sdk/src/error.rs +++ b/opentelemetry-sdk/src/error.rs @@ -1,6 +1,4 @@ //! Wrapper for error from trace, logs and metrics part of open telemetry. -use std::sync::PoisonError; - #[cfg(feature = "logs")] use crate::logs::LogError; #[cfg(feature = "metrics")] @@ -8,8 +6,13 @@ use crate::metrics::MetricError; use opentelemetry::propagation::PropagationError; #[cfg(feature = "trace")] use opentelemetry::trace::TraceError; +use std::sync::PoisonError; +use std::time::Duration; +use thiserror::Error; -/// Wrapper for error from both tracing and metrics part of open telemetry. +/// Wrapper for error from both tracing and metrics part of open telemetry. This +/// gives us a common error type where we _need_ to return errors that may come +/// from various components. #[derive(thiserror::Error, Debug)] #[non_exhaustive] pub enum Error { @@ -34,6 +37,10 @@ pub enum Error { /// Error happens when injecting and extracting information using propagators. Propagation(#[from] PropagationError), + /// Failed to shutdown an exporter + #[error(transparent)] + Shutdown(#[from] ShutdownError), + #[error("{0}")] /// Other types of failures not covered by the variants above. Other(String), @@ -44,3 +51,28 @@ impl From> for Error { Error::Other(err.to_string()) } } + +/// Errors returned by shutdown operations in the Export API. +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum ShutdownError { + /// Shutdown timed out before completing. + #[error("Shutdown timed out after {0:?}")] + Timeout(Duration), + + /// The export client failed while holding the client lock. It is not + /// possible to complete the shutdown and a retry will not help. + /// This is something that should not happen and should likely emit some diagnostic. + #[error("export client failed while holding lock; cannot retry.")] + ClientFailed(String), + + /// An unexpected error occurred during shutdown. + #[error(transparent)] + Other(#[from] Box), +} + +impl From> for ShutdownError { + fn from(err: PoisonError) -> Self { + ShutdownError::ClientFailed(format!("Mutex poisoned during shutdown: {}", err)) + } +} diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 902adb54dd..2312874126 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -1,12 +1,15 @@ //! Log exporters +use crate::logs::LogError; use crate::logs::LogRecord; -use crate::logs::{LogError, LogResult}; use crate::Resource; #[cfg(feature = "spec_unstable_logs_enabled")] use opentelemetry::logs::Severity; use opentelemetry::InstrumentationScope; use std::fmt::Debug; +// Re-export ShutdownError +pub use crate::error::ShutdownError; + /// A batch of log records to be exported by a `LogExporter`. /// /// The `LogBatch` struct holds a collection of log records along with their associated @@ -79,13 +82,15 @@ pub trait LogExporter: Send + Sync + Debug { /// A `LogResult<()>`, which is a result type indicating either a successful export (with /// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed. /// - fn export( - &self, - batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send; + fn export(&self, batch: LogBatch<'_>) + -> impl std::future::Future + Send; + + /// Shuts down the exporter. This function is idempotent; calling it + /// more than once has no additional effect. + fn shutdown(&mut self) -> ShutdownResult { + Ok(()) + } - /// Shuts down the exporter. - fn shutdown(&mut self) {} #[cfg(feature = "spec_unstable_logs_enabled")] /// Chek if logs are enabled. fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { @@ -98,3 +103,6 @@ pub trait LogExporter: Send + Sync + Debug { /// Describes the result of an export. pub type ExportResult = Result<(), LogError>; + +/// Describes the result of a shutdown in the log SDK. +pub type ShutdownResult = Result<(), ShutdownError>; diff --git a/opentelemetry-sdk/src/export/trace.rs b/opentelemetry-sdk/src/export/trace.rs index c606d85b1a..eaf357bbdd 100644 --- a/opentelemetry-sdk/src/export/trace.rs +++ b/opentelemetry-sdk/src/export/trace.rs @@ -7,9 +7,15 @@ use std::borrow::Cow; use std::fmt::Debug; use std::time::SystemTime; -/// Describes the result of an export. +// Re-export ShutdownError +pub use crate::error::ShutdownError; + +/// Results of an export operation pub type ExportResult = Result<(), TraceError>; +/// Result of a shutdown operation +pub type ShutdownResult = Result<(), ShutdownError>; + /// `SpanExporter` defines the interface that protocol-specific exporters must /// implement so that they can be plugged into OpenTelemetry SDK and support /// sending of telemetry data. @@ -30,7 +36,7 @@ pub trait SpanExporter: Send + Sync + Debug { /// /// Any retry logic that is required by the exporter is the responsibility /// of the exporter. - fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult>; + fn export(&mut self, batch: Vec) -> BoxFuture<'static, Result<(), TraceError>>; /// Shuts down the exporter. Called when SDK is shut down. This is an /// opportunity for exporter to do any cleanup required. @@ -43,7 +49,9 @@ pub trait SpanExporter: Send + Sync + Debug { /// flush the data and the destination is unavailable). SDK authors /// can decide if they want to make the shutdown timeout /// configurable. - fn shutdown(&mut self) {} + fn shutdown(&mut self) -> ShutdownResult { + Ok(()) + } /// This is a hint to ensure that the export of any Spans the exporter /// has received prior to the call to this function SHOULD be completed diff --git a/opentelemetry-sdk/src/logs/error.rs b/opentelemetry-sdk/src/logs/error.rs index 4f33ba6dbf..4669edb039 100644 --- a/opentelemetry-sdk/src/logs/error.rs +++ b/opentelemetry-sdk/src/logs/error.rs @@ -1,6 +1,9 @@ +// Re-export ShutdownError +pub use crate::error::ShutdownError; + use crate::export::ExportError; -use std::{sync::PoisonError, time::Duration}; +use std::time::Duration; use thiserror::Error; /// Describe the result of operations in log SDK. @@ -18,14 +21,16 @@ pub enum LogError { #[error("Exporter timed out after {} seconds", .0.as_secs())] ExportTimedOut(Duration), + /// The export client failed while holding the client lock. It is not + /// possible to complete the shutdown and a retry will not help. + /// This is something that should not happen and should likely emit some diagnostic. + #[error("export client failed while holding lock; cannot retry.")] + ClientFailed(String), + /// Processor is already shutdown #[error("{0} already shutdown")] AlreadyShutdown(String), - /// Mutex lock poisoning - #[error("mutex lock poisioning for {0}")] - MutexPoisoned(String), - /// Other errors propagated from log SDK that weren't covered above. #[error(transparent)] Other(#[from] Box), @@ -52,11 +57,6 @@ impl From<&'static str> for LogError { } } -impl From> for LogError { - fn from(err: PoisonError) -> Self { - LogError::Other(err.to_string().into()) - } -} /// Wrap type for string #[derive(Error, Debug)] #[error("{0}")] diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 479ca36dd2..b763bd0e60 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -141,9 +141,9 @@ impl LoggerProviderInner { // which is non-actionable by the user match err { // specific handling for mutex poisioning - LogError::MutexPoisoned(_) => { + LogError::ClientFailed(_) => { otel_debug!( - name: "LoggerProvider.Drop.ShutdownMutexPoisoned", + name: "LoggerProvider.Drop.ShutdownClientFailed", ); } _ => { diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 9d78383e8b..35c7a4fcc1 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -103,14 +103,14 @@ impl LogProcessor for SimpleLogProcessor { let result = self .exporter .lock() - .map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into())) + .map_err(|_| LogError::ClientFailed("SimpleLogProcessor".into())) .and_then(|exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) }); // Handle errors with specific static names match result { - Err(LogError::MutexPoisoned(_)) => { + Err(LogError::ClientFailed(_)) => { // logging as debug as this is not a user error otel_debug!( name: "SimpleLogProcessor.Emit.MutexPoisoning", @@ -134,10 +134,13 @@ impl LogProcessor for SimpleLogProcessor { self.is_shutdown .store(true, std::sync::atomic::Ordering::Relaxed); if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown(); + exporter + .shutdown() + .map_err(|e| LogError::Other(Box::new(e)))?; Ok(()) } else { - Err(LogError::MutexPoisoned("SimpleLogProcessor".into())) + // Failing to get the mutex means the export client failed whilst holding it + Err(LogError::ClientFailed("SimpleLogProcessor".into())) } } @@ -677,8 +680,6 @@ mod tests { async { Ok(()) } } - fn shutdown(&mut self) {} - fn set_resource(&mut self, resource: &Resource) { self.resource .lock() diff --git a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs index e5649d886b..438312fd56 100644 --- a/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs @@ -188,7 +188,12 @@ impl BatchLogProcessor { ) .await; - exporter.shutdown(); + if let Err(shutdown_error) = exporter.shutdown() { + otel_debug!( + name: "BatchLogProcessor.Shutdown.Error", + error = format!("{}", shutdown_error), + ); + } if let Err(send_error) = ch.send(result) { otel_debug!( @@ -282,7 +287,7 @@ where #[cfg(all(test, feature = "testing", feature = "logs"))] mod tests { - use crate::export::logs::{LogBatch, LogExporter}; + use crate::export::logs::{LogBatch, LogExporter, ShutdownResult}; use crate::logs::log_processor::{ OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY, @@ -325,7 +330,9 @@ mod tests { async { Ok(()) } } - fn shutdown(&mut self) {} + fn shutdown(&mut self) -> ShutdownResult { + Ok(()) + } fn set_resource(&mut self, resource: &Resource) { self.resource diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 97ae74ee85..92ba1cbc22 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -4,7 +4,7 @@ mod log_emitter; mod log_processor; pub(crate) mod record; -pub use error::{LogError, LogResult}; +pub use error::{LogError, LogResult, ShutdownError}; pub use log_emitter::{Builder, Logger, LoggerProvider}; pub use log_processor::{ BatchConfig, BatchConfigBuilder, BatchLogProcessor, BatchLogProcessorBuilder, LogProcessor, diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index dff6d93c7e..9aa9842ab4 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -1,6 +1,6 @@ -use crate::export::logs::{LogBatch, LogExporter}; -use crate::logs::LogRecord; +use crate::export::logs::{ExportResult, LogBatch, LogExporter}; use crate::logs::{LogError, LogResult}; +use crate::logs::{LogRecord, ShutdownError}; use crate::Resource; use opentelemetry::InstrumentationScope; use std::borrow::Cow; @@ -147,8 +147,17 @@ impl InMemoryLogExporter { /// ``` /// pub fn get_emitted_logs(&self) -> LogResult> { - let logs_guard = self.logs.lock().map_err(LogError::from)?; - let resource_guard = self.resource.lock().map_err(LogError::from)?; + let logs_guard = self.logs.lock().map_err(|_| { + LogError::from( + "InMemoryLogExporter: log buffer mutex poisoned trying to get_emitted_logs", + ) + })?; + let resource_guard = self.resource.lock().map_err(|_| { + LogError::from( + "InMemoryLogExporter: resource mutex poisoned trying to get_emitted_logs", + ) + })?; + let logs: Vec = logs_guard .iter() .map(|log_data| LogDataWithResource { @@ -171,12 +180,14 @@ impl InMemoryLogExporter { /// exporter.reset(); /// ``` /// - pub fn reset(&self) { - let _ = self - .logs + pub fn reset(&self) -> Result<(), LogError> { + self.logs .lock() - .map(|mut logs_guard| logs_guard.clear()) - .map_err(LogError::from); + .map_err(|_| { + LogError::from("InMemoryLogExporter: log buffer mutex poisoned trying to reset()") + }) + .map(|mut logs_guard| logs_guard.clear())?; + Ok(()) } } @@ -185,9 +196,15 @@ impl LogExporter for InMemoryLogExporter { fn export( &self, batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send { + ) -> impl std::future::Future + Send { async move { - let mut logs_guard = self.logs.lock().map_err(LogError::from)?; + // Lock the logs, returning an error if the lock is poisoned. + let mut logs_guard = self + .logs + .lock() + .map_err(|e| LogError::ClientFailed(e.to_string()))?; + + // Iterate over the log batch and push each log into the guard. for (log_record, instrumentation) in batch.iter() { let owned_log = OwnedLogData { record: (*log_record).clone(), @@ -195,14 +212,18 @@ impl LogExporter for InMemoryLogExporter { }; logs_guard.push(owned_log); } + + // Indicate success. Ok(()) } } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> Result<(), ShutdownError> { if self.should_reset_on_shutdown { - self.reset(); + self.reset() + .map_err(|e| ShutdownError::Other(Box::new(e)))?; } + Ok(()) } fn set_resource(&mut self, resource: &Resource) { diff --git a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs index 3645d9f6c2..0e346b113a 100644 --- a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs @@ -1,4 +1,4 @@ -use crate::export::trace::{ExportResult, SpanData, SpanExporter}; +use crate::export::trace::{ExportResult, ShutdownResult, SpanData, SpanExporter}; use crate::resource::Resource; use futures_util::future::BoxFuture; use opentelemetry::trace::{TraceError, TraceResult}; @@ -142,8 +142,9 @@ impl SpanExporter for InMemorySpanExporter { Box::pin(std::future::ready(Ok(()))) } - fn shutdown(&mut self) { - self.reset() + fn shutdown(&mut self) -> ShutdownResult { + self.reset(); + Ok(()) } fn set_resource(&mut self, resource: &Resource) { diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index e9996e3fc8..d8c8a055c6 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -1,3 +1,4 @@ +use crate::export::trace::{ShutdownError, ShutdownResult}; use crate::{ export::trace::{ExportResult, SpanData, SpanExporter}, trace::{SpanEvents, SpanLinks}, @@ -53,8 +54,11 @@ impl SpanExporter for TokioSpanExporter { Box::pin(std::future::ready(Ok(()))) } - fn shutdown(&mut self) { - self.tx_shutdown.send(()).unwrap(); + fn shutdown(&mut self) -> ShutdownResult { + self.tx_shutdown + .send(()) + .map_err::(|err| ShutdownError::Other(Box::new(err)))?; + Ok(()) } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 12b94d6d47..4466b491a7 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -146,14 +146,10 @@ impl SpanProcessor for SimpleSpanProcessor { } fn shutdown(&self) -> TraceResult<()> { - if let Ok(mut exporter) = self.exporter.lock() { - exporter.shutdown(); - Ok(()) - } else { - Err(TraceError::Other( - "SimpleSpanProcessor mutex poison at shutdown".into(), - )) - } + self.exporter + .lock()? + .shutdown() + .map_err(|e| TraceError::Other(Box::new(e))) } fn set_resource(&mut self, resource: &Resource) { @@ -651,7 +647,7 @@ mod tests { OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY, OTEL_BSP_SCHEDULE_DELAY_DEFAULT, }; - use crate::export::trace::{ExportResult, SpanData, SpanExporter}; + use crate::export::trace::{ExportResult, ShutdownError, SpanData, SpanExporter}; use crate::testing::trace::{new_test_export_span_data, InMemorySpanExporterBuilder}; use crate::trace::span_processor::{ OTEL_BSP_EXPORT_TIMEOUT_DEFAULT, OTEL_BSP_MAX_CONCURRENT_EXPORTS, @@ -856,7 +852,9 @@ mod tests { .boxed() } - fn shutdown(&mut self) {} + fn shutdown(&mut self) -> Result<(), ShutdownError> { + Ok(()) + } fn set_resource(&mut self, resource: &Resource) { let mut exported_resource = self.exported_resource.lock().unwrap(); *exported_resource = Some(resource.clone()); diff --git a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs index c3c241c776..276a33f89f 100644 --- a/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs +++ b/opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs @@ -287,7 +287,12 @@ impl BatchSpanProcessorInternal { // Stream has terminated or processor is shutdown, return to finish execution. BatchMessage::Shutdown(ch) => { self.flush(Some(ch)).await; - self.exporter.shutdown(); + if let Err(shutdown_error) = self.exporter.shutdown() { + otel_debug!( + name: "BatchSpanProcessor.ShutdownError", + msg = format!("{:?}", shutdown_error) + ); + } return false; } // propagate the resource diff --git a/opentelemetry-stdout/CHANGELOG.md b/opentelemetry-stdout/CHANGELOG.md index 046262458d..2f5fd9e220 100644 --- a/opentelemetry-stdout/CHANGELOG.md +++ b/opentelemetry-stdout/CHANGELOG.md @@ -5,6 +5,9 @@ - Bump msrv to 1.75.0. - *Breaking* time fields, `StartTime` and `EndTime` is printed on aggregation (Sum, Gauge, Histogram, ExpoHistogram) with 2 tabs, previously it was on aggregation data point, with 3 tabs, see [#2377](https://github.com/open-telemetry/opentelemetry-rust/pull/2377) and [#2411](https://github.com/open-telemetry/opentelemetry-rust/pull/2411). +- `LogExporter.shutdown` and `SpanExporter.shutdown` now explicitly return a result + + ## 0.27.0 Released 2024-Nov-11 diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index 6313474dd1..e50d76afc0 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use core::fmt; -use opentelemetry_sdk::export::logs::LogBatch; +use opentelemetry_sdk::export::logs::{LogBatch, ShutdownResult}; use opentelemetry_sdk::logs::LogResult; use opentelemetry_sdk::Resource; use std::sync::atomic; @@ -63,8 +63,9 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { } } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> ShutdownResult { self.is_shutdown.store(true, atomic::Ordering::SeqCst); + Ok(()) } fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index bf00909890..d13b5ad1c8 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -2,10 +2,10 @@ use chrono::{DateTime, Utc}; use core::fmt; use futures_util::future::BoxFuture; use opentelemetry::trace::TraceError; +use opentelemetry_sdk::export::trace::ShutdownResult; use opentelemetry_sdk::export::{self, trace::ExportResult}; -use std::sync::atomic; - use opentelemetry_sdk::resource::Resource; +use std::sync::atomic; /// An OpenTelemetry exporter that writes Spans to stdout on export. pub struct SpanExporter { @@ -59,8 +59,9 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { } } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> ShutdownResult { self.is_shutdown.store(true, atomic::Ordering::SeqCst); + Ok(()) } fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { diff --git a/stress/src/logs.rs b/stress/src/logs.rs index 2242d48eea..ac9cad018b 100644 --- a/stress/src/logs.rs +++ b/stress/src/logs.rs @@ -23,11 +23,8 @@ mod throughput; struct MockLogExporter; impl LogExporter for MockLogExporter { - fn export( - &self, - _batch: LogBatch<'_>, - ) -> impl std::future::Future> + Send { - async { Ok(()) } + async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> { + Ok(()) } }