diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 335295d3a8..82886be5f1 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -111,7 +111,10 @@ where let sorted_count = sorted_trackers.len(); let new_tracker = match sorted_trackers.entry(sorted_attrs) { - Entry::Occupied(occupied_entry) => occupied_entry.get().clone(), + Entry::Occupied(occupied_entry) => { + // do not return early, because collection phase might clear `all_trackers` multiple times + occupied_entry.get().clone() + } Entry::Vacant(vacant_entry) => { if !is_under_cardinality_limit(sorted_count) { sorted_trackers.entry(STREAM_OVERFLOW_ATTRIBUTES.clone()) @@ -197,8 +200,28 @@ where )); } - for (attrs, tracker) in to_collect.drain() { - let tracker = Arc::into_inner(tracker).expect("the only instance"); + for (attrs, mut tracker) in to_collect.drain() { + // Handles special case: + // measure-thread: get inserted tracker from `sorted_attribs` (holds tracker) + // collect-thread: replace sorted_attribs (clears sorted_attribs) + // collect-thread: clear all_attribs + // collect_thread: THIS-LOOP: loop until measure-thread still holds a tracker + // measure-thread: insert tracker into `all_attribs`` + // collect_thread: exits this loop after clearing trackers + let tracker = loop { + match Arc::try_unwrap(tracker) { + Ok(inner) => { + break inner; + } + Err(reinserted) => { + tracker = reinserted; + match self.all_attribs.write() { + Ok(mut all_trackers) => all_trackers.clear(), + Err(_) => return, + }; + } + }; + }; dest.push(map_fn(attrs, tracker)); } }