Skip to content

Commit

Permalink
Add separate buffer used in collect_and_reset
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Nov 17, 2024
1 parent e5c4e4e commit f38e09a
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod sum;
use core::fmt;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::replace;
use std::mem::swap;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
Expand Down Expand Up @@ -66,6 +66,8 @@ where
sorted_attribs: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
/// Configuration for an Aggregator
config: A::InitConfig,
/// Swap with `sorted_attribs` on every `collect_and_reset`.
for_collect_after_reset: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
}

impl<A> ValueMap<A>
Expand All @@ -78,9 +80,10 @@ where
tracker: A::create(&config),
is_set: AtomicBool::new(false),
},
all_attribs: RwLock::new(HashMap::new()),
sorted_attribs: Mutex::new(HashMap::new()),
all_attribs: RwLock::new(Default::default()),
sorted_attribs: Mutex::new(Default::default()),
config,
for_collect_after_reset: Mutex::new(Default::default()),
}
}

Expand Down Expand Up @@ -170,11 +173,14 @@ where
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
let mut to_collect = self
.for_collect_after_reset
.lock()
.unwrap_or_else(|err| err.into_inner());
// reset sorted trackers so new attributes set will be written into new hashmap
let trackers = match self.sorted_attribs.lock() {
Ok(mut trackers) => {
let new = HashMap::with_capacity(trackers.len());
replace(trackers.deref_mut(), new)
match self.sorted_attribs.lock() {
Ok(mut trackers) => {
swap(trackers.deref_mut(), to_collect.deref_mut());
}
Err(_) => return,

Check warning on line 185 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L185

Added line #L185 was not covered by tests
};
Expand All @@ -184,7 +190,7 @@ where
Err(_) => return,

Check warning on line 190 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L190

Added line #L190 was not covered by tests
};

prepare_data(dest, trackers.len());
prepare_data(dest, to_collect.len());

if self.no_attribs.is_set.swap(false, Ordering::AcqRel) {
dest.push(map_fn(
Expand All @@ -193,7 +199,7 @@ where
));
}

for (attrs, tracker) in trackers.into_iter() {
for (attrs, tracker) in to_collect.drain() {
let tracker = Arc::into_inner(tracker).expect("the only instance");
dest.push(map_fn(attrs, tracker));
}
Expand Down

0 comments on commit f38e09a

Please sign in to comment.