Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove is_shutdown flag from processors. And fix logger::emit() to check for the flag before emit. #2462

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions opentelemetry-sdk/benches/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,36 @@ fn logging_comparable_to_appender(c: &mut Criterion) {
});
}

fn logger_emit(c: &mut Criterion) {
// Provider is created once, outside of the benchmark
let provider = LoggerProvider::builder()
.with_log_processor(NoopProcessor {})
.build();

let logger = provider.logger("benchmark_emit");

// Create the log record once
let mut log_record = logger.create_log_record();
log_record.set_body("simple log".into());

// Convert log_record into a raw pointer
let log_record_ptr: *mut _ = &mut log_record;

c.bench_function("logger_emit", |b| {
b.iter(|| {
unsafe {
// Dereference the raw pointer to pass it to emit
logger.emit(std::ptr::read(log_record_ptr));
}
});
});
}

fn criterion_benchmark(c: &mut Criterion) {
logger_creation(c);
log_provider_creation(c);
logging_comparable_to_appender(c);
logger_emit(c);
log_benchmark_group(c, "simple-log", |logger| {
let mut log_record = logger.create_log_record();
log_record.set_body("simple log".into());
Expand Down
8 changes: 8 additions & 0 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,14 @@ impl opentelemetry::logs::Logger for Logger {

/// Emit a `LogRecord`.
fn emit(&self, mut record: Self::LogRecord) {
if self.provider.inner.is_shutdown.load(Ordering::Relaxed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with this is, this can affect throughput due to the contention introduced here. (Logs so far has no contention when using etw/user_events)....
Can you check stress test before and after?

I am unsure of a solution. Maybe don't check shutdown anywhere except in stdout like non-prod processors, and rely on mechanisms like export client/ etw etc. returning errors..
what harm can it cause 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, it doesn’t make sense to introduce the contention in the hot path, even if the contention is at atomic level.

Copy link
Contributor

@utpilla utpilla Dec 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only reading the atomic variable and the variable's value does not change for the most part of application lifetime, it most likely should not have any visible effect on the throughput.

We should be able to confirm that with the stress test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the effect of uncontested atomic would be negligible enough to be noticed over stress test. It is slightly more than normal bool-check, but much less than the perf associated in case of contention. I added the benchmark over logger:emit() in this PR, which shows the latency of 1-2ns:
main branch:

logger_emit             time:   [37.916 ns 37.977 ns 38.077 ns]
                        change: [-0.2733% -0.0648% +0.1328%] (p = 0.58 > 0.05)
                        No change in performance detected.

PR branch:

logger_emit             time:   [38.941 ns 39.027 ns 39.172 ns]
                        change: [+2.6756% +2.9861% +3.3292%] (p = 0.00 < 0.05)
                        Performance has regressed.

Copy link
Member Author

@lalitb lalitb Dec 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure of a solution. Maybe don't check shutdown anywhere except in stdout like non-prod processors, and rely on mechanisms like export client/ etw etc. returning errors..
what harm can it cause

The custom exporter which can be connected to reentrant and simple processor need to handle the shutdown properly in that case. As of now, etw and user_events don't do anything, so they will continue to emit even after shutdown invoked by user.

Copy link
Contributor

@utpilla utpilla Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding this "if check" in Logger::emit() is not only a more efficient short-circuiting operation but it also simplifies things. It lets us offer a more robust guarantee:
"SDK would not do any wasteful computation once its shutdown (regardless of what processors/exporters are used)"

Copy link
Member

@cijothomas cijothomas Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think it simplifies things. How do we determine the best place to do the is_shutdown check? Logger::emit() is one place.. how about the processor? how about the exporter? or even at the appender itself?

"SDK would not do any wasteful computation once its shutdown (regardless of what processors/exporters are used)"

I don't know if we need to make such a statement? If user want to achieve that, they can drop the providers. (even then, we need to redesign something in Metrics to make it truly no-ops(#2442)

Copy link
Contributor

@utpilla utpilla Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think it simplifies things.

The simplification I'm referring to is the effort required to achieving the expected shutdown behavior. Why call emit() on each processor, if the provider is shutdown. With having the "if check" in Loggger::emit() we effectively make it a no-op SDK, when the provider is shutdown.

How do we determine the best place to do the is_shutdown check? Logger::emit() is one place.. how about the processor? how about the exporter? or even at the appender itself?

Since the shutdown is an SDK concept, it would only make sense to have the check in an SDK component. Appenders do not depend on SDK, so they are out of question. Maybe we can update the LoggerProvider trait to add another method is_shutdown which the appenders can use to avoid creating log records altogether. That's a different discussion though as it involves updating the public trait from API.

With the current state of code, among the SDK components: Logger, Processor, and Exporter, I think the best place to check for shutdown would be Logger as that's the first SDK component that interacts with the appender. By checking it in the Logger, we can avoid making calls to processors altogether.

I don't know if we need to make such a statement?

We should have some stance about shutdown behavior. It could either be

  1. The effectiveness of the shutdown depends on the processors/exporters used
  2. Or that shutdown works regardless, of the configured pipeline.

I think it's nice to have # 2 and having the "if check" is a very easy way to achieve that. I don't see any disadvantage to doing that.

If user want to achieve that, they can drop the providers.

I'm not sure if it's as straightforward. Since the provider is essentially an Arc<LoggerProviderInner>, the inner value would not be dropped until there are no more references to it. The appender layer keeps a clone of the provider which then moved to the tracing_sbuscriber::registry(). So simply dropping the provider might not help.

Copy link
Member Author

@lalitb lalitb Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have some stance about shutdown behavior. It could either be
The effectiveness of the shutdown depends on the processors/exporters used
Or that shutdown works regardless, of the configured pipeline.
I think it's nice to have # 2 and having the "if check" is a very easy way to achieve that. I don't see any disadvantage to doing that.

Thanks, these are good discussions.

I am also inclined with #2, as it ensures consistent shutdown behavior regardless of pipeline configuration or internal component details. Relying on assumptions, such as the batch processor closing its channel during shutdown, creates unnecessary dependencies that could break with future changes. An explicit provider/logger level check provides a more reliable solution, more importantly since cost of adding it is small.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplification I'm referring to is the effort required to achieving the expected shutdown behavior. Why call emit() on each processor, if the provider is shutdown. With having the "if check" in Loggger::emit() we effectively make it a no-op SDK, when the provider is shutdown.

Without further redesign/refactoring, this won't make it a no-op sdk. There is non-trivial amount of work done even before emit() is invoked, to prepare LogRecord, and that'll continue even after shutdown. Yes, we save some work, but not quite no-op.

We should have some stance about shutdown behavior. It could either be

1 The effectiveness of the shutdown depends on the processors/exporters used
2 Or that shutdown works regardless, of the configured pipeline.

So far, we have been mostly sticking with 1 - we expect components to behave as expected. Example: We expect ObservableCallbacks to return in reasonable time, and if they don't return, Metrics SDK never recovers. Similarly we expect Exporters to return in reasonable time.
Not a strict comparison, but if we want to make a stance that Otel SDK works predictable, irrespective of the components plugged in, then it need a lot of redesign.
If we generally expect other components to behave properly, then why not continue rely on them to deal with shutdown too?

Another interesting thing we can do here (Similar to what Otel .NET does for its console exporter). If we make shutdown a pure exporter problem, we can have Stdout exporter continue to work even after shutdown is invoked, but can do something in the Stdout exporter to let user know that they have incorrect lifecycle management. IMHO, this is better than short-circuiting at up-stream sdk.(Of course internal logs can help, but unless users take care of avoiding self-logging, it can cause stack overflow due to cyclic loop)

I'm not sure if it's as straightforward. Since the provider is essentially an Arc, the inner value would not be dropped until there are no more references to it. The appender layer keeps a clone of the provider which then moved to the tracing_sbuscriber::registry(). So simply dropping the provider might not help.

Excellent point! This is something we need to have some guidance. (Even more challenging is the Metric case referred earlier).
If user decided to setup a tracing::subscriber globally (via init()), then they won't be able to really drop anything, unless tracing::subscriber provides a way. In other words, user is signing up to a global subscriber, so they should not expect to have the ability to drop things. tracing has alternate solutions to set a subscriber for a scope only, and they can drop the scope which in-turn can drop the logger_provider.

(#1660 demonstrates this issue already)

// Optionally, log a debug message indicating logs are being discarded due to shutdown.
otel_debug!(
name: "Logger.Emit.ProviderShutdown",
message = "Log discarded because the LoggerProvider is shut down."
);
return;
}
let provider = &self.provider;
let processors = provider.log_processors();

Expand Down
63 changes: 9 additions & 54 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
use opentelemetry::logs::Severity;
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cmp::min, env, sync::Mutex};
use std::{
fmt::{self, Debug, Formatter},
Expand Down Expand Up @@ -77,29 +77,18 @@ pub trait LogProcessor: Send + Sync + Debug {
#[derive(Debug)]
pub struct SimpleLogProcessor<T: LogExporter> {
exporter: Mutex<T>,
is_shutdown: AtomicBool,
}

impl<T: LogExporter> SimpleLogProcessor<T> {
pub(crate) fn new(exporter: T) -> Self {
SimpleLogProcessor {
exporter: Mutex::new(exporter),
is_shutdown: AtomicBool::new(false),
}
}
}

impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
// this is a warning, as the user is trying to log after the processor has been shutdown
otel_warn!(
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
);
return;
}

let result = self
.exporter
.lock()
Expand Down Expand Up @@ -131,8 +120,6 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
}

fn shutdown(&self) -> LogResult<()> {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
Ok(())
Expand Down Expand Up @@ -172,7 +159,6 @@ pub struct BatchLogProcessor {
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
shutdown_timeout: Duration,
is_shutdown: AtomicBool,

// Track dropped logs - we'll log this at shutdown
dropped_logs_count: AtomicUsize,
Expand All @@ -191,15 +177,6 @@ impl Debug for BatchLogProcessor {

impl LogProcessor for BatchLogProcessor {
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationScope) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if an emit is called after shutdown, even if we don't do is_shutdown check, the channel would be already closed, so the error it returned is good enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like

  1. Don't check for is_shutdown.
  2. Just export as usual.
  3. Since the channel is closed, it'll error out.
  4. Log that error.

No contention/perf cost for normal path. If logs are still emitted after shutdown, it clearly indicates some issue with user managing the lifetimes.

// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
otel_warn!(
name: "BatchLogProcessor.Emit.ProcessorShutdown",
message = "BatchLogProcessor has been shutdown. No further logs will be emitted."
);
return;
}

let result = self
.message_sender
.try_send(BatchMessage::ExportLog(Box::new((
Expand All @@ -219,11 +196,6 @@ impl LogProcessor for BatchLogProcessor {
}

fn force_flush(&self) -> LogResult<()> {
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return LogResult::Err(LogError::Other(
"BatchLogProcessor is already shutdown".into(),
));
}
let (sender, receiver) = mpsc::sync_channel(1);
self.message_sender
.try_send(BatchMessage::ForceFlush(sender))
Expand All @@ -241,20 +213,6 @@ impl LogProcessor for BatchLogProcessor {
}

fn shutdown(&self) -> LogResult<()> {
// test and set is_shutdown flag if it is not set
if self
.is_shutdown
.swap(true, std::sync::atomic::Ordering::Relaxed)
{
otel_warn!(
name: "BatchLogProcessor.Shutdown.ProcessorShutdown",
message = "BatchLogProcessor has been shutdown. No further logs will be emitted."
);
return LogResult::Err(LogError::AlreadyShutdown(
"BatchLogProcessor is already shutdown".into(),
));
}

let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed);
let max_queue_size = self.max_queue_size;
if dropped_logs > 0 {
Expand Down Expand Up @@ -396,7 +354,6 @@ impl BatchLogProcessor {
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
is_shutdown: AtomicBool::new(false),
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
}
Expand Down Expand Up @@ -902,20 +859,18 @@ mod tests {
.keep_records_on_shutdown()
.build();
let processor = SimpleLogProcessor::new(exporter.clone());
let provider = LoggerProvider::builder()
.with_log_processor(processor)
.build();
let logger = provider.logger("test-simple-logger");

let mut record: LogRecord = Default::default();
let instrumentation: InstrumentationScope = Default::default();

processor.emit(&mut record, &instrumentation);
let record: LogRecord = Default::default();

processor.shutdown().unwrap();
logger.emit(record.clone());

let is_shutdown = processor
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed);
assert!(is_shutdown);
provider.shutdown().unwrap();

processor.emit(&mut record, &instrumentation);
logger.emit(record);

assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
}
Expand Down
Loading