diff --git a/opentelemetry-sdk/src/metrics/error.rs b/opentelemetry-sdk/src/metrics/error.rs index cb8afcab0e..3227a95cd7 100644 --- a/opentelemetry-sdk/src/metrics/error.rs +++ b/opentelemetry-sdk/src/metrics/error.rs @@ -17,6 +17,12 @@ pub enum MetricError { /// Invalid configuration #[error("Config error {0}")] Config(String), + /// Shutdown already invoked + #[error("Shutdown already invoked")] + AlreadyShutdown, + /// Shutdown failed due to timeout exceeding + #[error("Shutdown failed due to timeout exceeding")] + ShutdownTimeout, /// Fail to export metrics #[error("Metrics exporter {0} failed with {name}", name = .0.exporter_name())] ExportErr(Box), @@ -27,6 +33,25 @@ pub enum MetricError { InvalidInstrumentConfiguration(&'static str), } +impl PartialEq for MetricError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (MetricError::Other(a), MetricError::Other(b)) => a == b, + (MetricError::Config(a), MetricError::Config(b)) => a == b, + (MetricError::AlreadyShutdown, MetricError::AlreadyShutdown) => true, + (MetricError::ShutdownTimeout, MetricError::ShutdownTimeout) => true, + (MetricError::ExportErr(a), MetricError::ExportErr(b)) => { + a.exporter_name() == b.exporter_name() + } + ( + MetricError::InvalidInstrumentConfiguration(a), + MetricError::InvalidInstrumentConfiguration(b), + ) => a == b, + _ => false, + } + } +} + impl From for MetricError { fn from(err: T) -> Self { MetricError::ExportErr(Box::new(err)) diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index 011de1f41c..02145ce5d3 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -123,9 +123,7 @@ impl SdkMeterProviderInner { .shutdown_invoked .load(std::sync::atomic::Ordering::Relaxed) { - Err(MetricError::Other( - "Cannot perform flush as MeterProvider shutdown already invoked.".into(), - )) + Err(MetricError::AlreadyShutdown) } else { self.pipes.force_flush() } @@ -137,9 +135,7 @@ impl SdkMeterProviderInner { .swap(true, std::sync::atomic::Ordering::SeqCst) { // If the previous value was true, shutdown was already invoked. - Err(MetricError::Other( - "MeterProvider shutdown already invoked.".into(), - )) + Err(MetricError::AlreadyShutdown) } else { self.pipes.shutdown() } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 43bfd0912e..327976c0a8 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -408,14 +408,19 @@ impl PeriodicReaderInner { .send(Message::Shutdown(response_tx)) .map_err(|e| MetricError::Other(e.to_string()))?; - if let Ok(response) = response_rx.recv() { - if response { - Ok(()) - } else { + // TODO: accept timeout from caller. + match response_rx.recv_timeout(Duration::from_secs(5)) { + Ok(response) => { + if response { + Ok(()) + } else { + Err(MetricError::Other("Failed to shutdown".into())) + } + } + Err(mpsc::RecvTimeoutError::Timeout) => Err(MetricError::ShutdownTimeout), + Err(mpsc::RecvTimeoutError::Disconnected) => { Err(MetricError::Other("Failed to shutdown".into())) } - } else { - Err(MetricError::Other("Failed to shutdown".into())) } } } @@ -573,10 +578,12 @@ mod tests { // calling shutdown again should return Err let result = meter_provider.shutdown(); assert!(result.is_err()); + assert_eq!(result.unwrap_err(), MetricError::AlreadyShutdown); // calling shutdown again should return Err let result = meter_provider.shutdown(); assert!(result.is_err()); + assert_eq!(result.unwrap_err(), MetricError::AlreadyShutdown); } #[test] @@ -598,6 +605,7 @@ mod tests { // calling force_flush after shutdown should return Err let result = meter_provider.force_flush(); assert!(result.is_err()); + assert_eq!(result.unwrap_err(), MetricError::AlreadyShutdown); } #[test]