Skip to content

Commit

Permalink
[chore][service] Add views to drop metrics based on levels
Browse files Browse the repository at this point in the history
For the following components:
- processor/batch
- contrib's internal/otelarrow/netstats
- otel-arrow library
  • Loading branch information
mx-psi committed Jan 21, 2025
1 parent dcc866c commit 21725d7
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 6 deletions.
11 changes: 9 additions & 2 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,17 @@ func (b *shard[T]) sendItems(trigger trigger) {
return
}
var bytes int
if b.processor.telemetry.detailed {
bpt := b.processor.telemetry

// Check if the instrument is enabled to calculate the size of the batch in bytes.
// See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled
batchSendSizeBytes := bpt.telemetryBuilder.ProcessorBatchBatchSendSizeBytes
instr, ok := batchSendSizeBytes.(interface{ Enabled(context.Context) bool })
if !ok || instr.Enabled(bpt.exportCtx) {
bytes = b.batch.sizeBytes(req)
}
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))

bpt.record(trigger, int64(sent), int64(bytes))
}

// singleShardBatcher is used when metadataKeys is empty, to avoid the
Expand Down
4 changes: 0 additions & 4 deletions processor/batchprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/internal"
Expand All @@ -23,8 +22,6 @@ const (
)

type batchProcessorTelemetry struct {
detailed bool

exportCtx context.Context

processorAttr metric.MeasurementOption
Expand All @@ -44,7 +41,6 @@ func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinali

return &batchProcessorTelemetry{
exportCtx: context.Background(),
detailed: set.MetricsLevel == configtelemetry.LevelDetailed,
telemetryBuilder: telemetryBuilder,
processorAttr: attrs,
}, nil
Expand Down
77 changes: 77 additions & 0 deletions service/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/multierr"
Expand All @@ -36,6 +37,15 @@ type meterProviderSettings struct {
asyncErrorChannel chan error
}

func dropViewOption(instrument sdkmetric.Instrument) sdkmetric.Option {
return sdkmetric.WithView(sdkmetric.NewView(
instrument,
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationDrop{},
},
))
}

// newMeterProvider creates a new MeterProvider from Config.
func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (metric.MeterProvider, error) {
if set.cfg.Level == configtelemetry.LevelNone || len(set.cfg.Readers) == 0 {
Expand All @@ -56,6 +66,73 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
opts = append(opts, sdkmetric.WithReader(r))
}

// otel-arrow library metrics
// See https://github.com/open-telemetry/otel-arrow/blob/c39257/pkg/otel/arrow_record/consumer.go#L174-L176
if set.cfg.Level < configtelemetry.LevelNormal {
scope := instrumentation.Scope{Name: "otel-arrow/pkg/otel/arrow_record"}
opts = append(opts,
dropViewOption(sdkmetric.Instrument{
Name: "arrow_batch_records",
Scope: scope,
}),
dropViewOption(sdkmetric.Instrument{
Name: "arrow_schema_resets",
Scope: scope,
}),
dropViewOption(sdkmetric.Instrument{
Name: "arrow_memory_inuse",
Scope: scope,
}),
)
}

// contrib's internal/otelarrow/netstats metrics
// See
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L130
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L165
if set.cfg.Level < configtelemetry.LevelDetailed {
scope := instrumentation.Scope{Name: "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"}
// Compressed size metrics.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))

opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))

// makeRecvMetrics for exporters.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_exporter_recv",
Scope: scope,
}))
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_exporter_recv_wire",
Scope: scope,
}))

// makeSentMetrics for receivers.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_receiver_sent",
Scope: scope,
}))
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_receiver_sent_wire",
Scope: scope,
}))
}

// Batch processor metrics
if set.cfg.Level < configtelemetry.LevelDetailed {
scope := instrumentation.Scope{Name: "go.opentelemetry.io/collector/processor/batchprocessor"}
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_processor_batch_batch_send_size_bytes",
Scope: scope,
}))
}

var err error
mp.MeterProvider, err = otelinit.InitOpenTelemetry(set.res, opts, disableHighCardinality)
if err != nil {
Expand Down

0 comments on commit 21725d7

Please sign in to comment.