Skip to content

Commit

Permalink
Add separate buffer used in collect_and_reset
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Nov 22, 2024
1 parent abe71ab commit 4e0dda5
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod sum;
use core::fmt;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem::replace;
use std::mem::swap;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock};
Expand Down Expand Up @@ -64,6 +64,8 @@ where
sorted_attribs: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
/// Configuration for an Aggregator
config: A::InitConfig,
/// Swap with `sorted_attribs` on every `collect_and_reset`.
for_collect_after_reset: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
}

impl<A> ValueMap<A>
Expand All @@ -76,9 +78,10 @@ where
tracker: A::create(&config),
is_set: AtomicBool::new(false),
},
all_attribs: RwLock::new(HashMap::new()),
sorted_attribs: Mutex::new(HashMap::new()),
all_attribs: RwLock::new(Default::default()),
sorted_attribs: Mutex::new(Default::default()),
config,
for_collect_after_reset: Mutex::new(Default::default()),
}
}

Expand Down Expand Up @@ -168,11 +171,14 @@ where
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
let mut to_collect = self
.for_collect_after_reset
.lock()
.unwrap_or_else(|err| err.into_inner());
// reset sorted trackers so new attributes set will be written into new hashmap
let trackers = match self.sorted_attribs.lock() {
match self.sorted_attribs.lock() {
Ok(mut trackers) => {
let new = HashMap::with_capacity(trackers.len());
replace(trackers.deref_mut(), new)
swap(trackers.deref_mut(), to_collect.deref_mut());
}
Err(_) => return,
};
Expand All @@ -182,7 +188,7 @@ where
Err(_) => return,
};

prepare_data(dest, trackers.len());
prepare_data(dest, to_collect.len());

if self.no_attribs.is_set.swap(false, Ordering::AcqRel) {
dest.push(map_fn(
Expand All @@ -191,7 +197,7 @@ where
));
}

for (attrs, tracker) in trackers.into_iter() {
for (attrs, tracker) in to_collect.drain() {
let tracker = Arc::into_inner(tracker).expect("the only instance");
dest.push(map_fn(attrs, tracker));
}
Expand Down

0 comments on commit 4e0dda5

Please sign in to comment.