Skip to content

Commit

Permalink
draft spanmetrics fix
Browse files Browse the repository at this point in the history
  • Loading branch information
shivanthzen committed Dec 9, 2024
1 parent 94d097a commit 260a599
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 39 deletions.
27 changes: 8 additions & 19 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,6 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
* - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ...
*/
deltaMetricKeys := make(map[metrics.Key]bool)
startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp {
startTime := rawMetrics.startTimestamp
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok {
startTime = lastTimestamp
}
// Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update.
deltaMetricKeys[mk] = true
}
return startTime
}

metricsNamespace := p.config.Namespace
if legacyMetricNamesFeatureGate.IsEnabled() && metricsNamespace == DefaultNamespace {
Expand All @@ -299,21 +288,21 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
sums := rawMetrics.sums
metric := sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(metricsNamespace, metricNameCalls))
sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
sums.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())

if !p.config.Histogram.Disable {
histograms := rawMetrics.histograms
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(metricsNamespace, metricNameDuration))
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
histograms.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(metricsNamespace, metricNameEvents))
events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
events.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())
}

for mk := range deltaMetricKeys {
Expand Down Expand Up @@ -406,12 +395,12 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
}
if !p.config.Histogram.Disable {
// aggregate histogram metrics
h := histograms.GetOrCreate(key, attributes)
h := histograms.GetOrCreate(key, attributes, startTimestamp)
p.addExemplar(span, duration, h)
h.Observe(duration)
}
// aggregate sums metrics
s := sums.GetOrCreate(key, attributes)
s := sums.GetOrCreate(key, attributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
s.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand All @@ -435,7 +424,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions)
p.metricKeyToDimensions.Add(eKey, eAttributes)
}
e := events.GetOrCreate(eKey, eAttributes)
e := events.GetOrCreate(eKey, eAttributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
e.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand Down Expand Up @@ -479,8 +468,8 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimesta
if !ok {
v = &resourceMetrics{
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp),
attributes: attr,
startTimestamp: startTimestamp,
}
Expand Down
43 changes: 23 additions & 20 deletions connector/spanmetricsconnector/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
type Key string

type HistogramMetrics interface {
GetOrCreate(key Key, attributes pcommon.Map) Histogram
BuildMetrics(pmetric.Metric, generateStartTimestamp, pcommon.Timestamp, pmetric.AggregationTemporality)
GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram
BuildMetrics(pmetric.Metric, pcommon.Timestamp, pmetric.AggregationTemporality)
ClearExemplars()
}

Expand Down Expand Up @@ -47,6 +47,7 @@ type explicitHistogram struct {
bounds []float64

maxExemplarCount *int
startTimestamp pcommon.Timestamp
}

type exponentialHistogram struct {
Expand All @@ -56,6 +57,7 @@ type exponentialHistogram struct {
histogram *structure.Histogram[float64]

maxExemplarCount *int
startTimestamp pcommon.Timestamp
}

type generateStartTimestamp = func(Key) pcommon.Timestamp
Expand All @@ -76,7 +78,7 @@ func NewExplicitHistogramMetrics(bounds []float64, maxExemplarCount *int) Histog
}
}

func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
h = &explicitHistogram{
Expand All @@ -94,17 +96,16 @@ func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map)

func (m *explicitHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyHistogram().SetAggregationTemporality(temporality)
dps := metric.Histogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, h := range m.metrics {
for _, h := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
dp.SetTimestamp(timestamp)
dp.SetStartTimestamp(h.startTimestamp)
dp.ExplicitBounds().FromRaw(h.bounds)
dp.BucketCounts().FromRaw(h.bucketCounts)
dp.SetCount(h.count)
Expand All @@ -114,6 +115,7 @@ func (m *explicitHistogramMetrics) BuildMetrics(
}
h.exemplars.CopyTo(dp.Exemplars())
h.attributes.CopyTo(dp.Attributes())
dp.Attributes().PutInt("startTimestamp", int64(h.startTimestamp))
}
}

Expand All @@ -123,7 +125,7 @@ func (m *explicitHistogramMetrics) ClearExemplars() {
}
}

func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimeStamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
histogram := new(structure.Histogram[float64])
Expand All @@ -146,23 +148,23 @@ func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Ma

func (m *exponentialHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyExponentialHistogram().SetAggregationTemporality(temporality)
dps := metric.ExponentialHistogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, m := range m.metrics {
for _, e := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
dp.SetStartTimestamp(e.startTimestamp)
dp.SetTimestamp(timestamp)
expoHistToExponentialDataPoint(m.histogram, dp)
for i := 0; i < m.exemplars.Len(); i++ {
m.exemplars.At(i).SetTimestamp(timestamp)
expoHistToExponentialDataPoint(e.histogram, dp)
for i := 0; i < e.exemplars.Len(); i++ {
e.exemplars.At(i).SetTimestamp(timestamp)
}
m.exemplars.CopyTo(dp.Exemplars())
m.attributes.CopyTo(dp.Attributes())
e.exemplars.CopyTo(dp.Exemplars())
e.attributes.CopyTo(dp.Attributes())
dp.Attributes().PutInt("startTimestamp", int64(e.startTimestamp))
}
}

Expand Down Expand Up @@ -241,13 +243,14 @@ type Sum struct {
count uint64
exemplars pmetric.ExemplarSlice
maxExemplarCount *int
startTimestamp pcommon.Timestamp
}

func (s *Sum) Add(value uint64) {
s.count += value
}

func NewSumMetrics(maxExemplarCount *int) SumMetrics {
func NewSumMetrics(maxExemplarCount *int, startTimeStamp pcommon.Timestamp) SumMetrics {
return SumMetrics{
metrics: make(map[Key]*Sum),
maxExemplarCount: maxExemplarCount,
Expand All @@ -259,13 +262,14 @@ type SumMetrics struct {
maxExemplarCount *int
}

func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map) *Sum {
func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) *Sum {
s, ok := m.metrics[key]
if !ok {
s = &Sum{
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
maxExemplarCount: m.maxExemplarCount,
startTimestamp: startTimestamp,
}
m.metrics[key] = s
}
Expand All @@ -284,7 +288,6 @@ func (s *Sum) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value

func (m *SumMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
Expand All @@ -293,9 +296,9 @@ func (m *SumMetrics) BuildMetrics(

dps := metric.Sum().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, s := range m.metrics {
for _, s := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
dp.SetStartTimestamp(s.startTimestamp)
dp.SetTimestamp(timestamp)
dp.SetIntValue(int64(s.count))
for i := 0; i < s.exemplars.Len(); i++ {
Expand Down

0 comments on commit 260a599

Please sign in to comment.