Skip to content

Commit

Permalink
Separate HashMap for sorted attribs
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Nov 8, 2024
1 parent c322a50 commit b4ee62f
Showing 1 changed file with 98 additions and 87 deletions.
185 changes: 98 additions & 87 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ mod precomputed_sum;
mod sum;

use core::fmt;
use std::collections::{HashMap, HashSet};
use std::mem::take;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::replace;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};

use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
Expand Down Expand Up @@ -43,6 +44,11 @@ pub(crate) trait Aggregator {
fn clone_and_reset(&self, init: &Self::InitConfig) -> Self;
}

struct NoAttribs<A> {
tracker: A,
is_set: AtomicBool,
}

/// The storage for sums.
///
/// This structure is parametrized by an `Operation` that indicates how
Expand All @@ -51,14 +57,13 @@ pub(crate) struct ValueMap<A>
where
A: Aggregator,
{
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
has_no_attribute_value: AtomicBool,
/// Tracker for values with no attributes attached.
no_attribute_tracker: A,
// for performance reasons, no_attribs tracker
no_attribs: NoAttribs<A>,
// for performance reasons, to handle attributes in the provided order
all_attribs: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
// different order of attribute keys should still map to same tracker instance
// this helps to achieve that and also enables implementing collection efficiently
sorted_attribs: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
/// Configuration for an Aggregator
config: A::InitConfig,
}
Expand All @@ -69,70 +74,68 @@ where
{
fn new(config: A::InitConfig) -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: A::create(&config),
count: AtomicUsize::new(0),
no_attribs: NoAttribs {
tracker: A::create(&config),
is_set: AtomicBool::new(false),
},
all_attribs: RwLock::new(HashMap::new()),
sorted_attribs: Mutex::new(HashMap::new()),
config,
}
}

fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) {
if attributes.is_empty() {
self.no_attribute_tracker.update(value);
self.has_no_attribute_value.store(true, Ordering::Release);
self.no_attribs.tracker.update(value);
self.no_attribs.is_set.store(true, Ordering::Release);
return;
}

let Ok(trackers) = self.trackers.read() else {
return;
};

// Try to retrieve and update the tracker with the attributes in the provided order first
if let Some(tracker) = trackers.get(attributes) {
tracker.update(value);
return;
}
match self.all_attribs.read() {
Ok(trackers) => {
if let Some(tracker) = trackers.get(attributes) {
tracker.update(value);
return;
}
}
Err(_) => return,
};

// Try to retrieve and update the tracker with the attributes sorted.
// Get or create a tracker
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
tracker.update(value);
let Ok(mut sorted_trackers) = self.sorted_attribs.lock() else {
return;
}
};

let sorted_count = sorted_trackers.len();
let new_tracker = match sorted_trackers.entry(sorted_attrs) {
Entry::Occupied(occupied_entry) => occupied_entry.get().clone(),
Entry::Vacant(vacant_entry) => {
if !is_under_cardinality_limit(sorted_count) {
sorted_trackers.entry(STREAM_OVERFLOW_ATTRIBUTES.clone())
.or_insert_with(|| {
otel_warn!( name: "ValueMap.measure",
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
);
Arc::new(A::create(&self.config))
})
.update(value);
return;
}
let new_tracker = Arc::new(A::create(&self.config));
vacant_entry.insert(new_tracker).clone()
}
};
drop(sorted_trackers);

// Give up the read lock before acquiring the write lock.
drop(trackers);
new_tracker.update(value);

let Ok(mut trackers) = self.trackers.write() else {
// Insert new tracker, so we could find it next time
let Ok(mut all_trackers) = self.all_attribs.write() else {
return;
};

// Recheck both the provided and sorted orders after acquiring the write lock
// in case another thread has pushed an update in the meantime.
if let Some(tracker) = trackers.get(attributes) {
tracker.update(value);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
tracker.update(value);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(A::create(&self.config));
new_tracker.update(value);

// Insert tracker with the attributes in the provided and sorted orders
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);

self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
overflow_value.update(value);
} else {
let new_tracker = A::create(&self.config);
new_tracker.update(value);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
otel_warn!( name: "ValueMap.measure",
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
);
}
all_trackers.insert(attributes.to_vec(), new_tracker);
}

/// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
Expand All @@ -141,20 +144,23 @@ where
where
MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
{
prepare_data(dest, self.count.load(Ordering::SeqCst));
if self.has_no_attribute_value.load(Ordering::Acquire) {
dest.push(map_fn(vec![], &self.no_attribute_tracker));
}

let Ok(trackers) = self.trackers.read() else {
return;
let trackers = match self.sorted_attribs.lock() {
Ok(trackers) => {
// it's important to release lock as fast as possible,
// so we don't block insertion of new attribute sets
trackers.clone()
}
Err(_) => return,
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
dest.push(map_fn(attrs.clone(), tracker));
}
prepare_data(dest, trackers.len());

if self.no_attribs.is_set.load(Ordering::Acquire) {
dest.push(map_fn(vec![], &self.no_attribs.tracker));
}

for (attrs, tracker) in trackers.into_iter() {
dest.push(map_fn(attrs, &tracker));
}
}

Expand All @@ -164,35 +170,40 @@ where
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
prepare_data(dest, self.count.load(Ordering::SeqCst));
if self.has_no_attribute_value.swap(false, Ordering::AcqRel) {
// reset sorted trackers so new attributes set will be written into new hashmap
let trackers = match self.sorted_attribs.lock() {
Ok(mut trackers) => {
let new = HashMap::with_capacity(trackers.len());
replace(trackers.deref_mut(), new)
}
Err(_) => return,
};
// reset all trackers, so all attribute sets will start using new hashmap
match self.all_attribs.write() {
Ok(mut all_trackers) => all_trackers.clear(),
Err(_) => return,
};

prepare_data(dest, trackers.len());

if self.no_attribs.is_set.swap(false, Ordering::AcqRel) {
dest.push(map_fn(
vec![],
self.no_attribute_tracker.clone_and_reset(&self.config),
self.no_attribs.tracker.clone_and_reset(&self.config),
));
}

let trackers = match self.trackers.write() {
Ok(mut trackers) => {
self.count.store(0, Ordering::SeqCst);
take(trackers.deref_mut())
}
Err(_) => todo!(),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.into_iter() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
}
let tracker = Arc::into_inner(tracker).expect("the only instance");
dest.push(map_fn(attrs, tracker));
}
}
}

/// Clear and allocate exactly required amount of space for all attribute-sets
fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
data.clear();
let total_len = list_len + 2; // to account for no_attributes case + overflow state
let total_len = list_len + 1; // to account for no_attributes case
if total_len > data.capacity() {
data.reserve_exact(total_len - data.capacity());
}
Expand Down

0 comments on commit b4ee62f

Please sign in to comment.