Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ValueMap - separate HashMap for sorted attribs #2288

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ mod tests {
for v in test.values {
h.measure(v, &[]);
}
let dp = h.value_map.no_attribute_tracker.lock().unwrap();
let dp = h.value_map.no_attribs.tracker.lock().unwrap();

assert_eq!(test.expected.max, dp.max);
assert_eq!(test.expected.min, dp.min);
Expand Down Expand Up @@ -720,7 +720,7 @@ mod tests {
for v in test.values {
h.measure(v, &[]);
}
let dp = h.value_map.no_attribute_tracker.lock().unwrap();
let dp = h.value_map.no_attribs.tracker.lock().unwrap();

assert_eq!(test.expected.max, dp.max);
assert_eq!(test.expected.min, dp.min);
Expand Down
216 changes: 128 additions & 88 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
mod sum;

use core::fmt;
use std::collections::{HashMap, HashSet};
use std::mem::take;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::swap;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};

use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
Expand Down Expand Up @@ -41,6 +42,11 @@
fn clone_and_reset(&self, init: &Self::InitConfig) -> Self;
}

struct NoAttribs<A> {
tracker: A,
is_set: AtomicBool,
}

/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
Expand All @@ -49,16 +55,17 @@
where
A: Aggregator,
{
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_tracker: A,
// for performance reasons, no_attribs tracker
no_attribs: NoAttribs<A>,
// for performance reasons, to handle attributes in the provided order
all_attribs: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
// different order of attribute keys should still map to same tracker instance
// this helps to achieve that and also enables implementing collection efficiently
sorted_attribs: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
/// Configuration for an Aggregator
config: A::InitConfig,
/// Swap with `sorted_attribs` on every `collect_and_reset`.
for_collect_after_reset: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
}

impl<A> ValueMap<A>
Expand All @@ -67,70 +74,72 @@
{
fn new(config: A::InitConfig) -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: A::create(&config),
count: AtomicUsize::new(0),
no_attribs: NoAttribs {
tracker: A::create(&config),
is_set: AtomicBool::new(false),
},
all_attribs: RwLock::new(Default::default()),
sorted_attribs: Mutex::new(Default::default()),
config,
for_collect_after_reset: Mutex::new(Default::default()),
}
}

fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) {
if attributes.is_empty() {
self.no_attribute_tracker.update(value);
self.has_no_attribute_value.store(true, Ordering::Release);
self.no_attribs.tracker.update(value);
self.no_attribs.is_set.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
tracker.update(value);
return;
}
match self.all_attribs.read() {
Ok(trackers) => {
if let Some(tracker) = trackers.get(attributes) {
tracker.update(value);
return;
}
}
Err(_) => return,

Check warning on line 103 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L103

Added line #L103 was not covered by tests
};

// Try to retrieve and update the tracker with the attributes sorted.
// Get or create a tracker
let sorted_attrs = sort_and_dedup(attributes);
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
tracker.update(value);
let Ok(mut sorted_trackers) = self.sorted_attribs.lock() else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces the possibility of a update() thread fighting for the lock with another update() thread (or even collect()). This is something we need to avoid as much as possible.
Previously, we have only read() locks, and the possibility of the update() thread getting blocked is very very small. Now that probability is quite high.

Metrics update() should not cause a thread to be blocked - as the thread could lose its CPU slice, and need to wait for a chance to get CPU back. We never wrote this down as a key requirement, but this is an important expectation from a metrics solution.

Copy link
Contributor Author

@fraillt fraillt Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably missed, this only happens on new attribute-set, updating existing attribute set didn't changed.
In fact this PR holds locks for much less time compared to what we have in main.
It does, however, two locks in new attribute set case (sorted.lock() and all_attribs.write()), but it also has certain benefits as well. E.g. on main, when we insert new attribute-set we block readers until we finish these steps:

  • get(as-is) -> (not found)
  • get(sorted) -> (not found)
  • create tracker
  • update tracker
  • insert (as-is)
  • insert (sorted)

on this branch for same scenario readers are blocked until we:

  • insert (as-is)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my comment about update() blocking other update() more frequently in this PR was wrong, apologies.!
I was thinking of a scenario which is not yet implemented in main, but I was working on it

  1. take read lock
  2. check incoming order and update if found and return
  3. check sorted order and update if found and return
  4. Check if above cardinality limit. If yes, update the overflow and return. (no write locks taken so far)
    The above change would ensure that in the situation of malicious inputs being constantly thrown, we only take read lock, but this PR need to take write lock. But I can see this similar change can be added in this PR too, so they can behave same.

I am not convinced about the need of storing sorted in a yet another hashmap. Is this primarily motivated by the issue described in #2093? If de-duplication is making things complex, we can do something like:

update(value, &attrs)
{
  if dedup_metrics_attributes_enabled
 {
   attrs = deduplicate the attributes; // its costly, but under feature flag.
 }
// this point onwards, operate as if attrs has no duplicates. (either by feature flag, or by users ensuring the attrs are free of duplicates.
...rest of logic.. 
}

Would this approach make things easier, and avoid the need of storing sorted attributes separately?

Separately, I love the separate RwLock for collect purposes, and the idea of mem::swapping that with main hashmap. Can we get that to its own PR we can merge quickly, to make continuous progress.

I also want to revisit the idea of draining actively used trackers for each collect - we could keep active ones in the hashmap, but use a marker field to indicate if active or not. This will make Metrics sdk operate fully in existing memory, as long as users have same timeseries, which is the common case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's several reasons for having separate hashmap:

I'll come with test results soon, to verify if my concerns are valid or not... :) but basically, I still hold an opinion that this is best approach to move forward.

return;
}
};

let sorted_count = sorted_trackers.len();
let new_tracker = match sorted_trackers.entry(sorted_attrs) {
Entry::Occupied(occupied_entry) => {
// do not return early, because collection phase might clear `all_trackers` multiple times
occupied_entry.get().clone()
}
Entry::Vacant(vacant_entry) => {
if !is_under_cardinality_limit(sorted_count) {
sorted_trackers.entry(STREAM_OVERFLOW_ATTRIBUTES.clone())
.or_insert_with(|| {
otel_warn!( name: "ValueMap.measure",
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
);
Arc::new(A::create(&self.config))
})
.update(value);
return;
}
let new_tracker = Arc::new(A::create(&self.config));
vacant_entry.insert(new_tracker).clone()
}
};
drop(sorted_trackers);

// Give up the read lock before acquiring the write lock.
drop(trackers);
new_tracker.update(value);

let Ok(mut trackers) = self.trackers.write() else {
// Insert new tracker, so we could find it next time
let Ok(mut all_trackers) = self.all_attribs.write() else {
return;
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
tracker.update(value);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
tracker.update(value);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(A::create(&self.config));
new_tracker.update(value);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
overflow_value.update(value);
} else {
let new_tracker = A::create(&self.config);
new_tracker.update(value);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
otel_warn!( name: "ValueMap.measure",
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
);
}
all_trackers.insert(attributes.to_vec(), new_tracker);
}

/// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
Expand All @@ -139,20 +148,23 @@
where
MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
{
prepare_data(dest, self.count.load(Ordering::SeqCst));
if self.has_no_attribute_value.load(Ordering::Acquire) {
dest.push(map_fn(vec![], &self.no_attribute_tracker));
}

let Ok(trackers) = self.trackers.read() else {
return;
let trackers = match self.sorted_attribs.lock() {
Ok(trackers) => {
// it's important to release lock as fast as possible,
// so we don't block insertion of new attribute sets
trackers.clone()
}
Err(_) => return,

Check warning on line 157 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L157

Added line #L157 was not covered by tests
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
dest.push(map_fn(attrs.clone(), tracker));
}
prepare_data(dest, trackers.len());

if self.no_attribs.is_set.load(Ordering::Acquire) {
dest.push(map_fn(vec![], &self.no_attribs.tracker));
}

for (attrs, tracker) in trackers.into_iter() {
dest.push(map_fn(attrs, &tracker));
}
}

Expand All @@ -162,35 +174,63 @@
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
prepare_data(dest, self.count.load(Ordering::SeqCst));
if self.has_no_attribute_value.swap(false, Ordering::AcqRel) {
let mut to_collect = self
.for_collect_after_reset
.lock()
.unwrap_or_else(|err| err.into_inner());
// reset sorted trackers so new attributes set will be written into new hashmap
match self.sorted_attribs.lock() {
Ok(mut trackers) => {
swap(trackers.deref_mut(), to_collect.deref_mut());
}
Err(_) => return,

Check warning on line 186 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L186

Added line #L186 was not covered by tests
};
// reset all trackers, so all attribute sets will start using new hashmap
match self.all_attribs.write() {
Ok(mut all_trackers) => all_trackers.clear(),
Err(_) => return,

Check warning on line 191 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L191

Added line #L191 was not covered by tests
};

prepare_data(dest, to_collect.len());

if self.no_attribs.is_set.swap(false, Ordering::AcqRel) {
dest.push(map_fn(
vec![],
self.no_attribute_tracker.clone_and_reset(&self.config),
self.no_attribs.tracker.clone_and_reset(&self.config),
));
}

let trackers = match self.trackers.write() {
Ok(mut trackers) => {
self.count.store(0, Ordering::SeqCst);
take(trackers.deref_mut())
}
Err(_) => todo!(),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.into_iter() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
}
for (attrs, mut tracker) in to_collect.drain() {
// Handles special case:
// measure-thread: get inserted tracker from `sorted_attribs` (holds tracker)
// collect-thread: replace sorted_attribs (clears sorted_attribs)
// collect-thread: clear all_attribs
// collect_thread: THIS-LOOP: loop until measure-thread still holds a tracker
// measure-thread: insert tracker into `all_attribs``
// collect_thread: exits this loop after clearing trackers
let tracker = loop {
match Arc::try_unwrap(tracker) {
Ok(inner) => {
break inner;
}
Err(reinserted) => {
tracker = reinserted;
match self.all_attribs.write() {
Ok(mut all_trackers) => all_trackers.clear(),
Err(_) => return,

Check warning on line 220 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L216-L220

Added lines #L216 - L220 were not covered by tests
};
}
};
};
dest.push(map_fn(attrs, tracker));
}
}
}

/// Clear and allocate exactly required amount of space for all attribute-sets
fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
data.clear();
let total_len = list_len + 2; // to account for no_attributes case + overflow state
let total_len = list_len + 1; // to account for no_attributes case
if total_len > data.capacity() {
data.reserve_exact(total_len - data.capacity());
}
Expand Down
Loading