Skip to content

Commit

Permalink
[chore][service] Drop detailed batch processor and otelarrow metrics …
Browse files Browse the repository at this point in the history
…depending on metrics level
  • Loading branch information
mx-psi committed Jan 21, 2025
1 parent e87797c commit a84b816
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
10 changes: 8 additions & 2 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,16 @@ func (b *shard[T]) sendItems(trigger trigger) {
return
}
var bytes int
if b.processor.telemetry.detailed {
bpt := b.processor.telemetry

// Check if the instrument is enabled to calculate the size of the batch in bytes.
batchSendSizeBytes := bpt.telemetryBuilder.ProcessorBatchBatchSendSizeBytes
instr, ok := batchSendSizeBytes.(interface{ Enabled(context.Context) bool })
if !ok || instr.Enabled(bpt.exportCtx) {
bytes = b.batch.sizeBytes(req)
}
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))

bpt.record(trigger, int64(sent), int64(bytes))
}

// singleShardBatcher is used when metadataKeys is empty, to avoid the
Expand Down
4 changes: 0 additions & 4 deletions processor/batchprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/internal"
Expand All @@ -23,8 +22,6 @@ const (
)

type batchProcessorTelemetry struct {
detailed bool

exportCtx context.Context

processorAttr metric.MeasurementOption
Expand All @@ -44,7 +41,6 @@ func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinali

return &batchProcessorTelemetry{
exportCtx: context.Background(),
detailed: set.MetricsLevel == configtelemetry.LevelDetailed,
telemetryBuilder: telemetryBuilder,
processorAttr: attrs,
}, nil
Expand Down
44 changes: 44 additions & 0 deletions service/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/multierr"
Expand Down Expand Up @@ -56,6 +57,49 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
opts = append(opts, sdkmetric.WithReader(r))
}

dropAggregation := sdkmetric.Stream{
Aggregation: sdkmetric.AggregationDrop{},
}

if set.cfg.Level < configtelemetry.LevelNormal {
otelArrowScope := "otel-arrow/pkg/otel/arrow_record"
opts = append(opts,
sdkmetric.WithView(sdkmetric.NewView(
sdkmetric.Instrument{
Name: "arrow_batch_records",
Scope: instrumentation.Scope{Name: otelArrowScope},
},
dropAggregation,
)),
sdkmetric.WithView(sdkmetric.NewView(
sdkmetric.Instrument{
Name: "arrow_schema_resets",
Scope: instrumentation.Scope{Name: otelArrowScope},
},
dropAggregation,
)),
sdkmetric.WithView(sdkmetric.NewView(
sdkmetric.Instrument{
Name: "arrow_memory_inuse",
Scope: instrumentation.Scope{Name: otelArrowScope},
},
dropAggregation,
)),
)
}

if set.cfg.Level < configtelemetry.LevelDetailed {
batchProcessorScope := "go.opentelemetry.io/collector/processor/batchprocessor"
opts = append(opts, sdkmetric.WithView(sdkmetric.NewView(
sdkmetric.Instrument{
Name: "otelcol_processor_batch_batch_send_size_bytes",
Scope: instrumentation.Scope{Name: batchProcessorScope},
},
dropAggregation,
)))

}

var err error
mp.MeterProvider, err = otelinit.InitOpenTelemetry(set.res, opts, disableHighCardinality)
if err != nil {
Expand Down

0 comments on commit a84b816

Please sign in to comment.