diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 2cee6c4d0d..dbe19351f4 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -160,6 +160,7 @@ impl PeriodicReader { message_sender: Arc::new(message_sender), producer: Mutex::new(None), exporter: exporter_arc.clone(), + handle: Mutex::new(None), }), }; let cloned_reader = reader.clone(); @@ -211,20 +212,19 @@ impl PeriodicReader { remaining_interval = Duration::ZERO; } } - Ok(Message::Shutdown(response_sender)) => { + Ok(Message::Shutdown) => { // Perform final export and break out of loop and exit the thread otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown"); let export_result = cloned_reader.collect_and_export(timeout); + otel_debug!( + name: "PeriodReaderInvokedExporterExport", + export_result = format!("{:?}", export_result) + ); let shutdown_result = exporter_arc.shutdown(); otel_debug!( name: "PeriodReaderInvokedExporterShutdown", shutdown_result = format!("{:?}", shutdown_result) ); - if export_result.is_err() || shutdown_result.is_err() { - response_sender.send(false).unwrap(); - } else { - response_sender.send(true).unwrap(); - } otel_debug!( name: "PeriodReaderThreadExiting", @@ -280,12 +280,19 @@ impl PeriodicReader { // TODO: Should we fail-fast here and bubble up the error to user? #[allow(unused_variables)] - if let Err(e) = result_thread_creation { - otel_error!( - name: "PeriodReaderThreadStartError", - message = "Failed to start PeriodicReader thread. Metrics will not be exported.", - error = format!("{:?}", e) - ); + match result_thread_creation { + Ok(handle) => { + // store the handle to the thread in the inner + // so that it can be joined on shutdown. + reader.inner.handle.lock().unwrap().replace(handle); + } + Err(e) => { + otel_error!( + name: "PeriodReaderThreadStartError", + message = "Failed to start PeriodicReader thread. Metrics will not be exported.", + error = format!("{:?}", e) + ); + } } reader } @@ -305,6 +312,7 @@ struct PeriodicReaderInner { exporter: Arc, message_sender: Arc>, producer: Mutex>>, + handle: Mutex>>, } impl PeriodicReaderInner { @@ -409,20 +417,35 @@ impl PeriodicReaderInner { } fn shutdown(&self) -> MetricResult<()> { - // TODO: See if this is better to be created upfront. - let (response_tx, response_rx) = mpsc::channel(); + // Attempt to send a shutdown message to the background thread. self.message_sender - .send(Message::Shutdown(response_tx)) - .map_err(|e| MetricError::Other(e.to_string()))?; - - if let Ok(response) = response_rx.recv() { - if response { - Ok(()) - } else { - Err(MetricError::Other("Failed to shutdown".into())) + .send(Message::Shutdown) + .map_err(|e| MetricError::Other(format!("Failed to send shutdown message: {}", e)))?; + + // Acquire the lock to access the thread handle. + let mut handle_guard = self.handle.lock().map_err(|_| { + MetricError::Other("Failed to acquire lock on thread handle".into()) + })?; + + // If the handle exists, attempt to join the thread. + if let Some(thread_handle) = handle_guard.take() { + match thread_handle.join() { + Ok(_) => { + otel_info!(name: "PeriodReaderThreadJoined"); + Ok(()) + } + Err(e) => { + otel_error!( + name: "PeriodReaderThreadJoinError", + error = format!("Thread join error: {:?}", e) + ); + Err(MetricError::Other("Failed to join thread".into())) + } } } else { - Err(MetricError::Other("Failed to shutdown".into())) + // Handle the case where no thread handle is present. + otel_error!(name: "PeriodReaderThreadHandleMissing"); + Err(MetricError::Other("Thread handle not found".into())) } } } @@ -430,7 +453,7 @@ impl PeriodicReaderInner { #[derive(Debug)] enum Message { Flush(Sender), - Shutdown(Sender), + Shutdown, } impl MetricReader for PeriodicReader {