Skip to content

Commit

Permalink
[connector/signatometrics]Add core logic for signal to metrics (#36852)
Browse files Browse the repository at this point in the history
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Adding core logic for the signal to metrics connector.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

Related to
#35930

<!--Describe what testing was performed and which tests were added.-->
#### Testing

Unit tests added

<!--Describe the documentation added.-->
#### Documentation

Documentation is already added in README.
<!--Please delete paragraphs that you did not use before submitting.-->

---------

Co-authored-by: Christos Markou <[email protected]>
  • Loading branch information
lahsivjar and ChrsMark authored Jan 9, 2025
1 parent 5beb818 commit 9da269e
Show file tree
Hide file tree
Showing 32 changed files with 4,151 additions and 24 deletions.
27 changes: 27 additions & 0 deletions .chloggen/signaltometrics-corelogic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: signaltometricsconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add core logic for the signal to metrics connector to make it functional.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35930]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
227 changes: 210 additions & 17 deletions connector/signaltometricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,239 @@ package signaltometricsconnector // import "github.com/open-telemetry/openteleme

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator"
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan"
)

type signalToMetrics struct {
next consumer.Metrics
logger *zap.Logger

spanMetricDefs []model.MetricDef[ottlspan.TransformContext]
dpMetricDefs []model.MetricDef[ottldatapoint.TransformContext]
logMetricDefs []model.MetricDef[ottllog.TransformContext]

component.StartFunc
component.ShutdownFunc
}

func newSignalToMetrics(
set connector.Settings,
next consumer.Metrics,
) *signalToMetrics {
return &signalToMetrics{
logger: set.Logger,
next: next,
}
}

func (sm *signalToMetrics) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (sm *signalToMetrics) ConsumeTraces(context.Context, ptrace.Traces) error {
return nil
func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if len(sm.spanMetricDefs) == 0 {
return nil
}

processedMetrics := pmetric.NewMetrics()
processedMetrics.ResourceMetrics().EnsureCapacity(td.ResourceSpans().Len())
aggregator := aggregator.NewAggregator[ottlspan.TransformContext](processedMetrics)

for i := 0; i < td.ResourceSpans().Len(); i++ {
resourceSpan := td.ResourceSpans().At(i)
resourceAttrs := resourceSpan.Resource().Attributes()
for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ {
scopeSpan := resourceSpan.ScopeSpans().At(j)
for k := 0; k < scopeSpan.Spans().Len(); k++ {
span := scopeSpan.Spans().At(k)
spanAttrs := span.Attributes()
for _, md := range sm.spanMetricDefs {
filteredSpanAttrs, ok := md.FilterAttributes(spanAttrs)
if !ok {
continue
}

// The transform context is created from original attributes so that the
// OTTL expressions are also applied on the original attributes.
tCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan)
if md.Conditions != nil {
match, err := md.Conditions.Eval(ctx, tCtx)
if err != nil {
return fmt.Errorf("failed to evaluate conditions: %w", err)
}
if !match {
sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name))
continue
}
}

filteredResAttrs := md.FilterResourceAttributes(resourceAttrs)
if err := aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredSpanAttrs, 1); err != nil {
return err
}
}
}
}
}
aggregator.Finalize(sm.spanMetricDefs)
return sm.next.ConsumeMetrics(ctx, processedMetrics)
}

func (sm *signalToMetrics) ConsumeMetrics(context.Context, pmetric.Metrics) error {
return nil
func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error {
if len(sm.dpMetricDefs) == 0 {
return nil
}

processedMetrics := pmetric.NewMetrics()
processedMetrics.ResourceMetrics().EnsureCapacity(m.ResourceMetrics().Len())
aggregator := aggregator.NewAggregator[ottldatapoint.TransformContext](processedMetrics)
for i := 0; i < m.ResourceMetrics().Len(); i++ {
resourceMetric := m.ResourceMetrics().At(i)
resourceAttrs := resourceMetric.Resource().Attributes()
for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
scopeMetric := resourceMetric.ScopeMetrics().At(j)
for k := 0; k < scopeMetric.Metrics().Len(); k++ {
metrics := scopeMetric.Metrics()
metric := metrics.At(k)
for _, md := range sm.dpMetricDefs {
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs)
aggregate := func(dp any, dpAttrs pcommon.Map) error {
// The transform context is created from original attributes so that the
// OTTL expressions are also applied on the original attributes.
tCtx := ottldatapoint.NewTransformContext(dp, metric, metrics, scopeMetric.Scope(), resourceMetric.Resource(), scopeMetric, resourceMetric)
if md.Conditions != nil {
match, err := md.Conditions.Eval(ctx, tCtx)
if err != nil {
return fmt.Errorf("failed to evaluate conditions: %w", err)
}
if !match {
sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name))
return nil
}
}
return aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, dpAttrs, 1)
}

//exhaustive:enforce
switch metric.Type() {
case pmetric.MetricTypeGauge:
dps := metric.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeExponentialHistogram:
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
if !ok {
continue
}
if err := aggregate(dp, filteredDPAttrs); err != nil {
return err
}
}
case pmetric.MetricTypeEmpty:
continue
}
}
}
}
}
aggregator.Finalize(sm.dpMetricDefs)
return sm.next.ConsumeMetrics(ctx, processedMetrics)
}

func (sm *signalToMetrics) ConsumeLogs(context.Context, plog.Logs) error {
return nil
func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
if len(sm.logMetricDefs) == 0 {
return nil
}

processedMetrics := pmetric.NewMetrics()
processedMetrics.ResourceMetrics().EnsureCapacity(logs.ResourceLogs().Len())
aggregator := aggregator.NewAggregator[ottllog.TransformContext](processedMetrics)
for i := 0; i < logs.ResourceLogs().Len(); i++ {
resourceLog := logs.ResourceLogs().At(i)
resourceAttrs := resourceLog.Resource().Attributes()
for j := 0; j < resourceLog.ScopeLogs().Len(); j++ {
scopeLog := resourceLog.ScopeLogs().At(j)
for k := 0; k < scopeLog.LogRecords().Len(); k++ {
log := scopeLog.LogRecords().At(k)
logAttrs := log.Attributes()
for _, md := range sm.logMetricDefs {
filteredLogAttrs, ok := md.FilterAttributes(logAttrs)
if !ok {
continue
}

// The transform context is created from original attributes so that the
// OTTL expressions are also applied on the original attributes.
tCtx := ottllog.NewTransformContext(log, scopeLog.Scope(), resourceLog.Resource(), scopeLog, resourceLog)
if md.Conditions != nil {
match, err := md.Conditions.Eval(ctx, tCtx)
if err != nil {
return fmt.Errorf("failed to evaluate conditions: %w", err)
}
if !match {
sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name))
continue
}
}
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs)
if err := aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredLogAttrs, 1); err != nil {
return err
}
}
}
}
}
aggregator.Finalize(sm.logMetricDefs)
return sm.next.ConsumeMetrics(ctx, processedMetrics)
}
Loading

0 comments on commit 9da269e

Please sign in to comment.