diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 1be4142432..6d7de674af 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -29,6 +29,7 @@ tokio = { workspace = true, features = ["rt", "time"], optional = true } tokio-stream = { workspace = true, optional = true } http = { workspace = true, optional = true } tracing = {workspace = true, optional = true} +rustc-hash = "2.0.0" [package.metadata.docs.rs] all-features = true diff --git a/opentelemetry-sdk/src/metrics/internal/hashed.rs b/opentelemetry-sdk/src/metrics/internal/hashed.rs new file mode 100644 index 0000000000..387cc7444a --- /dev/null +++ b/opentelemetry-sdk/src/metrics/internal/hashed.rs @@ -0,0 +1,146 @@ +use std::{ + borrow::{Borrow, Cow}, + hash::{BuildHasher, Hash, Hasher}, + ops::Deref, +}; + +use rustc_hash::*; + +/// Hash value only once, works with references and owned types. +pub(crate) struct Hashed<'a, T> +where + T: ToOwned + ?Sized, +{ + value: Cow<'a, T>, + hash: u64, +} + +impl<'a, T> Hashed<'a, T> +where + T: ToOwned + Hash + ?Sized, +{ + pub(crate) fn from_borrowed(value: &'a T) -> Self { + let mut hasher = FxHasher::default(); + value.hash(&mut hasher); + Self { + value: Cow::Borrowed(value), + hash: hasher.finish(), + } + } + + pub(crate) fn from_owned(value: ::Owned) -> Self { + let hash = calc_hash(value.borrow()); + Self { + value: Cow::Owned(value), + hash, + } + } + + pub(crate) fn mutate(self, f: impl FnOnce(&mut ::Owned)) -> Hashed<'static, T> { + let mut value = self.value.into_owned(); + f(&mut value); + let hash = calc_hash(value.borrow()); + Hashed { + value: Cow::Owned(value), + hash, + } + } + + pub(crate) fn into_owned(self) -> Hashed<'static, T> { + let value = self.value.into_owned(); + Hashed { + value: Cow::Owned(value), + hash: self.hash, + } + } + + pub(crate) fn into_inner_owned(self) -> T::Owned { + self.value.into_owned() + } +} + +fn calc_hash(value: T) -> u64 +where + T: Hash, +{ + let mut hasher = FxHasher::default(); + value.hash(&mut hasher); + hasher.finish() +} + +impl Clone for Hashed<'_, T> +where + T: ToOwned + ?Sized, +{ + fn clone(&self) -> Self { + Self { + value: self.value.clone(), + hash: self.hash, + } + } + + fn clone_from(&mut self, source: &Self) { + self.value.clone_from(&source.value); + self.hash = source.hash; + } +} + +impl Hash for Hashed<'_, T> +where + T: ToOwned + Hash + ?Sized, +{ + fn hash(&self, state: &mut H) { + state.write_u64(self.hash); + } +} + +impl PartialEq for Hashed<'_, T> +where + T: ToOwned + PartialEq + ?Sized, +{ + fn eq(&self, other: &Self) -> bool { + self.value.as_ref() == other.value.as_ref() + } +} + +impl Eq for Hashed<'_, T> where T: ToOwned + Eq + ?Sized {} + +impl Deref for Hashed<'_, T> +where + T: ToOwned + ?Sized, +{ + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value.deref() + } +} + +/// Used to make [`Hashed`] values no-op in [`HashMap`](std::collections::HashMap) or [`HashSet`](std::collections::HashSet). +/// For all other keys types (except for [`u64`]) it will panic. +#[derive(Default, Clone)] +pub(crate) struct HashedNoOpBuilder { + hashed: u64, +} + +impl Hasher for HashedNoOpBuilder { + fn finish(&self) -> u64 { + self.hashed + } + + fn write(&mut self, _bytes: &[u8]) { + panic!("Only works with `Hashed` value") + } + + fn write_u64(&mut self, i: u64) { + self.hashed = i; + } +} + +impl BuildHasher for HashedNoOpBuilder { + type Hasher = HashedNoOpBuilder; + + fn build_hasher(&self) -> Self::Hasher { + HashedNoOpBuilder::default() + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 8b6136d7ce..e20c615fb6 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -1,5 +1,6 @@ mod aggregate; mod exponential_histogram; +mod hashed; mod histogram; mod last_value; mod precomputed_sum; @@ -15,13 +16,12 @@ use std::sync::{Arc, RwLock}; use aggregate::is_under_cardinality_limit; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; +use hashed::{Hashed, HashedNoOpBuilder}; use once_cell::sync::Lazy; use opentelemetry::{otel_warn, KeyValue}; -use crate::metrics::AttributeSet; - -pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = - Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = + Lazy::new(|| Hashed::from_owned(vec![KeyValue::new("otel.metric.overflow", "true")])); pub(crate) trait Aggregator { /// A static configuration that is needed in order to initialize aggregator. @@ -52,7 +52,7 @@ where A: Aggregator, { /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, + trackers: RwLock, Arc, HashedNoOpBuilder>>, /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. @@ -69,7 +69,7 @@ where { fn new(config: A::InitConfig) -> Self { ValueMap { - trackers: RwLock::new(HashMap::new()), + trackers: RwLock::new(HashMap::default()), has_no_attribute_value: AtomicBool::new(false), no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), @@ -84,19 +84,25 @@ where return; } + let attributes = Hashed::from_borrowed(attributes); + let Ok(trackers) = self.trackers.read() else { return; }; // Try to retrieve and update the tracker with the attributes in the provided order first - if let Some(tracker) = trackers.get(attributes) { + if let Some(tracker) = trackers.get(&attributes) { tracker.update(value); return; } // Try to retrieve and update the tracker with the attributes sorted. - let sorted_attrs = AttributeSet::from(attributes).into_vec(); - if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + let sorted_attrs = attributes.clone().mutate(|list| { + // use stable sort + list.sort_by(|a, b| a.key.cmp(&b.key)); + dedup_remove_first(list, |a, b| a.key == b.key); + }); + if let Some(tracker) = trackers.get(&sorted_attrs) { tracker.update(value); return; } @@ -110,20 +116,20 @@ where // Recheck both the provided and sorted orders after acquiring the write lock // in case another thread has pushed an update in the meantime. - if let Some(tracker) = trackers.get(attributes) { + if let Some(tracker) = trackers.get(&attributes) { tracker.update(value); - } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + } else if let Some(tracker) = trackers.get(&sorted_attrs) { tracker.update(value); } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { let new_tracker = Arc::new(A::create(&self.config)); new_tracker.update(value); // Insert tracker with the attributes in the provided and sorted orders - trackers.insert(attributes.to_vec(), new_tracker.clone()); + trackers.insert(attributes.into_owned(), new_tracker.clone()); trackers.insert(sorted_attrs, new_tracker); self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { + } else if let Some(overflow_value) = trackers.get(&STREAM_OVERFLOW_ATTRIBUTES) { overflow_value.update(value); } else { let new_tracker = A::create(&self.config); @@ -153,7 +159,7 @@ where let mut seen = HashSet::new(); for (attrs, tracker) in trackers.iter() { if seen.insert(Arc::as_ptr(tracker)) { - dest.push(map_fn(attrs.clone(), tracker)); + dest.push(map_fn(attrs.clone().into_inner_owned(), tracker)); } } } @@ -183,8 +189,25 @@ where let mut seen = HashSet::new(); for (attrs, tracker) in trackers.into_iter() { if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + dest.push(map_fn( + attrs.into_inner_owned(), + tracker.clone_and_reset(&self.config), + )); + } + } + } +} + +fn dedup_remove_first(values: &mut Vec, is_eq: impl Fn(&T, &T) -> bool) { + // we cannot use vec.dedup_by because it will remove last duplicate not first + if values.len() > 1 { + let mut i = values.len() - 1; + while i != 0 { + let is_same = unsafe { is_eq(values.get_unchecked(i - 1), values.get_unchecked(i)) }; + if is_same { + values.remove(i - 1); } + i -= 1; } } } @@ -392,8 +415,45 @@ impl AtomicallyUpdate for f64 { #[cfg(test)] mod tests { + use std::usize; + use super::*; + fn assert_deduped( + input: [(i32, bool); N], + expect: [(i32, bool); M], + ) { + let mut list: Vec<(i32, bool)> = Vec::from(input); + dedup_remove_first(&mut list, |a, b| a.0 == b.0); + assert_eq!(list, expect); + } + + #[test] + fn deduplicate_by_removing_first_element_from_sorted_array() { + assert_deduped([], []); + assert_deduped([(1, true)], [(1, true)]); + assert_deduped([(1, false), (1, false), (1, true)], [(1, true)]); + assert_deduped( + [(1, true), (2, false), (2, false), (2, true)], + [(1, true), (2, true)], + ); + assert_deduped( + [(1, true), (1, false), (1, true), (2, true)], + [(1, true), (2, true)], + ); + assert_deduped( + [ + (1, false), + (1, true), + (2, false), + (2, true), + (3, false), + (3, true), + ], + [(1, true), (2, true), (3, true)], + ); + } + #[test] fn can_store_u64_atomic_value() { let atomic = u64::new_atomic_tracker(0); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 2d4d3f0437..7b66bf5c54 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -138,11 +138,6 @@ impl AttributeSet { pub(crate) fn iter(&self) -> impl Iterator { self.0.iter().map(|kv| (&kv.key, &kv.value)) } - - /// Returns the underlying Vec of KeyValue pairs - pub(crate) fn into_vec(self) -> Vec { - self.0 - } } impl Hash for AttributeSet {