Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeriodicReader - alternate shutdown implementation #2482

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 47 additions & 24 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
message_sender: Arc::new(message_sender),
producer: Mutex::new(None),
exporter: exporter_arc.clone(),
handle: Mutex::new(None),
}),
};
let cloned_reader = reader.clone();
Expand Down Expand Up @@ -211,20 +212,19 @@
remaining_interval = Duration::ZERO;
}
}
Ok(Message::Shutdown(response_sender)) => {
Ok(Message::Shutdown) => {
// Perform final export and break out of loop and exit the thread
otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
let export_result = cloned_reader.collect_and_export(timeout);
otel_debug!(
name: "PeriodReaderInvokedExporterExport",
export_result = format!("{:?}", export_result)

Check warning on line 221 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L221

Added line #L221 was not covered by tests
);
let shutdown_result = exporter_arc.shutdown();
otel_debug!(
name: "PeriodReaderInvokedExporterShutdown",
shutdown_result = format!("{:?}", shutdown_result)
);
if export_result.is_err() || shutdown_result.is_err() {
response_sender.send(false).unwrap();
} else {
response_sender.send(true).unwrap();
}

otel_debug!(
name: "PeriodReaderThreadExiting",
Expand Down Expand Up @@ -280,12 +280,19 @@

// TODO: Should we fail-fast here and bubble up the error to user?
#[allow(unused_variables)]
if let Err(e) = result_thread_creation {
otel_error!(
name: "PeriodReaderThreadStartError",
message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
error = format!("{:?}", e)
);
match result_thread_creation {
Ok(handle) => {
// store the handle to the thread in the inner
// so that it can be joined on shutdown.
reader.inner.handle.lock().unwrap().replace(handle);
}
Err(e) => {
otel_error!(

Check warning on line 290 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L289-L290

Added lines #L289 - L290 were not covered by tests
name: "PeriodReaderThreadStartError",
message = "Failed to start PeriodicReader thread. Metrics will not be exported.",
error = format!("{:?}", e)

Check warning on line 293 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L293

Added line #L293 was not covered by tests
);
}
}
reader
}
Expand All @@ -305,6 +312,7 @@
exporter: Arc<dyn PushMetricExporter>,
message_sender: Arc<mpsc::Sender<Message>>,
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
handle: Mutex<Option<thread::JoinHandle<()>>>,
}

impl PeriodicReaderInner {
Expand Down Expand Up @@ -409,28 +417,43 @@
}

fn shutdown(&self) -> MetricResult<()> {
// TODO: See if this is better to be created upfront.
let (response_tx, response_rx) = mpsc::channel();
// Attempt to send a shutdown message to the background thread.
self.message_sender
.send(Message::Shutdown(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;

if let Ok(response) = response_rx.recv() {
if response {
Ok(())
} else {
Err(MetricError::Other("Failed to shutdown".into()))
.send(Message::Shutdown)
.map_err(|e| MetricError::Other(format!("Failed to send shutdown message: {}", e)))?;

// Acquire the lock to access the thread handle.
let mut handle_guard = self.handle.lock().map_err(|_| {
MetricError::Other("Failed to acquire lock on thread handle".into())

Check warning on line 427 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L427

Added line #L427 was not covered by tests
})?;

// If the handle exists, attempt to join the thread.
if let Some(thread_handle) = handle_guard.take() {
match thread_handle.join() {
Ok(_) => {
otel_info!(name: "PeriodReaderThreadJoined");
Ok(())
}
Err(e) => {
otel_error!(

Check warning on line 438 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L437-L438

Added lines #L437 - L438 were not covered by tests
name: "PeriodReaderThreadJoinError",
error = format!("Thread join error: {:?}", e)

Check warning on line 440 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L440

Added line #L440 was not covered by tests
);
Err(MetricError::Other("Failed to join thread".into()))

Check warning on line 442 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L442

Added line #L442 was not covered by tests
}
}
} else {
Err(MetricError::Other("Failed to shutdown".into()))
// Handle the case where no thread handle is present.
otel_error!(name: "PeriodReaderThreadHandleMissing");
Err(MetricError::Other("Thread handle not found".into()))

Check warning on line 448 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L447-L448

Added lines #L447 - L448 were not covered by tests
}
}
}

#[derive(Debug)]
enum Message {
Flush(Sender<bool>),
Shutdown(Sender<bool>),
Shutdown,
}

impl MetricReader for PeriodicReader {
Expand Down
Loading