From ea2d8a087fa476c6ab7160edfdc5883f9ee3df27 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 17 Jan 2025 21:51:07 +0000 Subject: [PATCH 1/5] Reduce allocs --- loadgen/cmd/otelsoak/config.example.yaml | 6 +++--- receiver/loadgenreceiver/logs.go | 12 +++++++----- receiver/loadgenreceiver/metrics.go | 12 +++++++----- receiver/loadgenreceiver/traces.go | 12 +++++++----- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/loadgen/cmd/otelsoak/config.example.yaml b/loadgen/cmd/otelsoak/config.example.yaml index 11ed99c7..c4aa6232 100644 --- a/loadgen/cmd/otelsoak/config.example.yaml +++ b/loadgen/cmd/otelsoak/config.example.yaml @@ -47,15 +47,15 @@ service: logs: receivers: [loadgen] processors: [ratelimit, transform/rewrite] - exporters: [otlp, debug] + exporters: [otlp] metrics: receivers: [loadgen] processors: [ratelimit, transform/rewrite] - exporters: [otlp, debug] + exporters: [otlp] traces: receivers: [loadgen] processors: [ratelimit, transform/rewrite] - exporters: [otlp, debug] + exporters: [otlp] telemetry: metrics: readers: diff --git a/receiver/loadgenreceiver/logs.go b/receiver/loadgenreceiver/logs.go index d61045e5..4e1f711f 100644 --- a/receiver/loadgenreceiver/logs.go +++ b/receiver/loadgenreceiver/logs.go @@ -106,13 +106,16 @@ func (ar *logsGenerator) Start(ctx context.Context, _ component.Host) error { wg.Add(1) go func() { defer wg.Done() + // per-worker temporary container to avoid allocs + // FIXME: this doesn't work with fanoutconsumer as it will mark as read only + next := plog.NewLogs() for { select { case <-startCtx.Done(): return default: } - next, err := ar.nextLogs() + err := ar.nextLogs(next) if errors.Is(err, list.ErrLoopLimitReached) { return } @@ -147,13 +150,12 @@ func (ar *logsGenerator) Shutdown(context.Context) error { return nil } -func (ar *logsGenerator) nextLogs() (plog.Logs, error) { +func (ar *logsGenerator) nextLogs(next plog.Logs) error { now := pcommon.NewTimestampFromTime(time.Now()) - next := plog.NewLogs() sample, err := ar.samples.Next() if err != nil { - return sample, err + return err } sample.CopyTo(next) @@ -167,5 +169,5 @@ func (ar *logsGenerator) nextLogs() (plog.Logs, error) { } } - return next, nil + return nil } diff --git a/receiver/loadgenreceiver/metrics.go b/receiver/loadgenreceiver/metrics.go index 6ac4cf0a..ee502aef 100644 --- a/receiver/loadgenreceiver/metrics.go +++ b/receiver/loadgenreceiver/metrics.go @@ -106,13 +106,16 @@ func (ar *metricsGenerator) Start(ctx context.Context, _ component.Host) error { wg.Add(1) go func() { defer wg.Done() + // per-worker temporary container to avoid allocs + // FIXME: this doesn't work with fanoutconsumer as it will mark as read only + next := pmetric.NewMetrics() for { select { case <-startCtx.Done(): return default: } - next, err := ar.nextMetrics() + err := ar.nextMetrics(next) if errors.Is(err, list.ErrLoopLimitReached) { return } @@ -147,13 +150,12 @@ func (ar *metricsGenerator) Shutdown(context.Context) error { return nil } -func (ar *metricsGenerator) nextMetrics() (pmetric.Metrics, error) { +func (ar *metricsGenerator) nextMetrics(next pmetric.Metrics) error { now := pcommon.NewTimestampFromTime(time.Now()) - next := pmetric.NewMetrics() sample, err := ar.samples.Next() if err != nil { - return sample, err + return err } sample.CopyTo(next) @@ -199,5 +201,5 @@ func (ar *metricsGenerator) nextMetrics() (pmetric.Metrics, error) { } } - return next, nil + return nil } diff --git a/receiver/loadgenreceiver/traces.go b/receiver/loadgenreceiver/traces.go index fb57cc7b..f73c374e 100644 --- a/receiver/loadgenreceiver/traces.go +++ b/receiver/loadgenreceiver/traces.go @@ -108,13 +108,16 @@ func (ar *tracesGenerator) Start(ctx context.Context, _ component.Host) error { wg.Add(1) go func() { defer wg.Done() + // per-worker temporary container to avoid allocs + // FIXME: this doesn't work with fanoutconsumer as it will mark as read only + next := ptrace.NewTraces() for { select { case <-startCtx.Done(): return default: } - next, err := ar.nextTraces() + err := ar.nextTraces(next) if errors.Is(err, list.ErrLoopLimitReached) { return } @@ -149,11 +152,10 @@ func (ar *tracesGenerator) Shutdown(context.Context) error { return nil } -func (ar *tracesGenerator) nextTraces() (ptrace.Traces, error) { - next := ptrace.NewTraces() +func (ar *tracesGenerator) nextTraces(next ptrace.Traces) error { sample, err := ar.samples.Next() if err != nil { - return sample, err + return err } sample.CopyTo(next) @@ -171,5 +173,5 @@ func (ar *tracesGenerator) nextTraces() (ptrace.Traces, error) { } } - return next, nil + return nil } From aebe23bb363e42b86e1de29f4f2a3fe158ab8e74 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 22 Jan 2025 15:25:39 +0000 Subject: [PATCH 2/5] Configurable optimization --- receiver/loadgenreceiver/config.go | 6 ++++++ receiver/loadgenreceiver/logs.go | 11 ++++++++--- receiver/loadgenreceiver/metrics.go | 11 ++++++++--- receiver/loadgenreceiver/traces.go | 11 ++++++++--- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/receiver/loadgenreceiver/config.go b/receiver/loadgenreceiver/config.go index 0036b142..47b28e8c 100644 --- a/receiver/loadgenreceiver/config.go +++ b/receiver/loadgenreceiver/config.go @@ -40,6 +40,12 @@ type Config struct { // As requests are synchronous, when concurrency is N, there will be N in-flight requests. // This is similar to the `agent_replicas` config in apmsoak. Concurrency int `mapstructure:"concurrency"` + + // PerfReusePdata enables reusing pdata data structures to reduce allocation and GC pressure on the loadgenreceiver. + // This optimization is not compatible with fanoutconsumer, i.e. in pipelines where there are more than 1 consumer, + // as fanoutconsumer will mark the pdata struct as read only and cannot be reused. + // See https://github.com/open-telemetry/opentelemetry-collector/blob/461a3558086a03ab13ea121d12e28e185a1c79b0/internal/fanoutconsumer/logs.go#L70 + PerfReusePdata bool `mapstructure:"perf_reuse_pdata"` } type MetricsConfig struct { diff --git a/receiver/loadgenreceiver/logs.go b/receiver/loadgenreceiver/logs.go index 4e1f711f..d7297b27 100644 --- a/receiver/loadgenreceiver/logs.go +++ b/receiver/loadgenreceiver/logs.go @@ -106,15 +106,20 @@ func (ar *logsGenerator) Start(ctx context.Context, _ component.Host) error { wg.Add(1) go func() { defer wg.Done() - // per-worker temporary container to avoid allocs - // FIXME: this doesn't work with fanoutconsumer as it will mark as read only - next := plog.NewLogs() + var next plog.Logs + if ar.cfg.PerfReusePdata { + // per-worker temporary container to avoid allocs + next = plog.NewLogs() + } for { select { case <-startCtx.Done(): return default: } + if !ar.cfg.PerfReusePdata { + next = plog.NewLogs() + } err := ar.nextLogs(next) if errors.Is(err, list.ErrLoopLimitReached) { return diff --git a/receiver/loadgenreceiver/metrics.go b/receiver/loadgenreceiver/metrics.go index ee502aef..4a521c6e 100644 --- a/receiver/loadgenreceiver/metrics.go +++ b/receiver/loadgenreceiver/metrics.go @@ -106,15 +106,20 @@ func (ar *metricsGenerator) Start(ctx context.Context, _ component.Host) error { wg.Add(1) go func() { defer wg.Done() - // per-worker temporary container to avoid allocs - // FIXME: this doesn't work with fanoutconsumer as it will mark as read only - next := pmetric.NewMetrics() + var next pmetric.Metrics + if ar.cfg.PerfReusePdata { + // per-worker temporary container to avoid allocs + next = pmetric.NewMetrics() + } for { select { case <-startCtx.Done(): return default: } + if !ar.cfg.PerfReusePdata { + next = pmetric.NewMetrics() + } err := ar.nextMetrics(next) if errors.Is(err, list.ErrLoopLimitReached) { return diff --git a/receiver/loadgenreceiver/traces.go b/receiver/loadgenreceiver/traces.go index f73c374e..4e257b0b 100644 --- a/receiver/loadgenreceiver/traces.go +++ b/receiver/loadgenreceiver/traces.go @@ -108,15 +108,20 @@ func (ar *tracesGenerator) Start(ctx context.Context, _ component.Host) error { wg.Add(1) go func() { defer wg.Done() - // per-worker temporary container to avoid allocs - // FIXME: this doesn't work with fanoutconsumer as it will mark as read only - next := ptrace.NewTraces() + var next ptrace.Traces + if ar.cfg.PerfReusePdata { + // per-worker temporary container to avoid allocs + next = ptrace.NewTraces() + } for { select { case <-startCtx.Done(): return default: } + if !ar.cfg.PerfReusePdata { + next = ptrace.NewTraces() + } err := ar.nextTraces(next) if errors.Is(err, list.ErrLoopLimitReached) { return From 5ce0389c7cbabe8cd66e4d98c14447632c8643fb Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 22 Jan 2025 15:30:39 +0000 Subject: [PATCH 3/5] Add bench --- receiver/loadgenreceiver/logs_bench_test.go | 42 +++++++++++-------- .../loadgenreceiver/metrics_bench_test.go | 42 +++++++++++-------- receiver/loadgenreceiver/traces_bench_test.go | 42 +++++++++++-------- 3 files changed, 72 insertions(+), 54 deletions(-) diff --git a/receiver/loadgenreceiver/logs_bench_test.go b/receiver/loadgenreceiver/logs_bench_test.go index cc602be4..61232b77 100644 --- a/receiver/loadgenreceiver/logs_bench_test.go +++ b/receiver/loadgenreceiver/logs_bench_test.go @@ -19,6 +19,7 @@ package loadgenreceiver // import "github.com/elastic/opentelemetry-collector-co import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -30,22 +31,27 @@ import ( ) func BenchmarkLogsGenerator(b *testing.B) { - doneCh := make(chan Stats) - cfg := createDefaultReceiverConfig(doneCh, nil, nil) - cfg.(*Config).Logs.MaxReplay = b.N - r, _ := createLogsReceiver(context.Background(), receiver.Settings{ - ID: component.ID{}, - TelemetrySettings: component.TelemetrySettings{ - Logger: zap.NewNop(), - }, - BuildInfo: component.BuildInfo{}, - }, cfg, consumertest.NewNop()) - b.ResetTimer() - err := r.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(b, err) - defer func() { - err := r.Shutdown(context.Background()) - require.NoError(b, err) - }() - <-doneCh + for _, perfReusePdata := range []bool{false, true} { + b.Run(fmt.Sprintf("perfReusePdata=%v", perfReusePdata), func(b *testing.B) { + doneCh := make(chan Stats) + cfg := createDefaultReceiverConfig(doneCh, nil, nil) + cfg.(*Config).Logs.MaxReplay = b.N + cfg.(*Config).PerfReusePdata = perfReusePdata + r, _ := createLogsReceiver(context.Background(), receiver.Settings{ + ID: component.ID{}, + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.NewNop(), + }, + BuildInfo: component.BuildInfo{}, + }, cfg, consumertest.NewNop()) + b.ResetTimer() + err := r.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(b, err) + defer func() { + err := r.Shutdown(context.Background()) + require.NoError(b, err) + }() + <-doneCh + }) + } } diff --git a/receiver/loadgenreceiver/metrics_bench_test.go b/receiver/loadgenreceiver/metrics_bench_test.go index 212e89af..a36b4fde 100644 --- a/receiver/loadgenreceiver/metrics_bench_test.go +++ b/receiver/loadgenreceiver/metrics_bench_test.go @@ -19,6 +19,7 @@ package loadgenreceiver // import "github.com/elastic/opentelemetry-collector-co import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -30,22 +31,27 @@ import ( ) func BenchmarkMetricsGenerator(b *testing.B) { - doneCh := make(chan Stats) - cfg := createDefaultReceiverConfig(nil, doneCh, nil) - cfg.(*Config).Metrics.MaxReplay = b.N - r, _ := createMetricsReceiver(context.Background(), receiver.Settings{ - ID: component.ID{}, - TelemetrySettings: component.TelemetrySettings{ - Logger: zap.NewNop(), - }, - BuildInfo: component.BuildInfo{}, - }, cfg, consumertest.NewNop()) - b.ResetTimer() - err := r.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(b, err) - defer func() { - err := r.Shutdown(context.Background()) - require.NoError(b, err) - }() - <-doneCh + for _, perfReusePdata := range []bool{false, true} { + b.Run(fmt.Sprintf("perfReusePdata=%v", perfReusePdata), func(b *testing.B) { + doneCh := make(chan Stats) + cfg := createDefaultReceiverConfig(nil, doneCh, nil) + cfg.(*Config).Metrics.MaxReplay = b.N + cfg.(*Config).PerfReusePdata = perfReusePdata + r, _ := createMetricsReceiver(context.Background(), receiver.Settings{ + ID: component.ID{}, + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.NewNop(), + }, + BuildInfo: component.BuildInfo{}, + }, cfg, consumertest.NewNop()) + b.ResetTimer() + err := r.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(b, err) + defer func() { + err := r.Shutdown(context.Background()) + require.NoError(b, err) + }() + <-doneCh + }) + } } diff --git a/receiver/loadgenreceiver/traces_bench_test.go b/receiver/loadgenreceiver/traces_bench_test.go index 9cdb365f..fc358aa6 100644 --- a/receiver/loadgenreceiver/traces_bench_test.go +++ b/receiver/loadgenreceiver/traces_bench_test.go @@ -19,6 +19,7 @@ package loadgenreceiver // import "github.com/elastic/opentelemetry-collector-co import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -30,22 +31,27 @@ import ( ) func BenchmarkTracesGenerator(b *testing.B) { - doneCh := make(chan Stats) - cfg := createDefaultReceiverConfig(nil, nil, doneCh) - cfg.(*Config).Traces.MaxReplay = b.N - r, _ := createTracesReceiver(context.Background(), receiver.Settings{ - ID: component.ID{}, - TelemetrySettings: component.TelemetrySettings{ - Logger: zap.NewNop(), - }, - BuildInfo: component.BuildInfo{}, - }, cfg, consumertest.NewNop()) - b.ResetTimer() - err := r.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(b, err) - defer func() { - err := r.Shutdown(context.Background()) - require.NoError(b, err) - }() - <-doneCh + for _, perfReusePdata := range []bool{false, true} { + b.Run(fmt.Sprintf("perfReusePdata=%v", perfReusePdata), func(b *testing.B) { + doneCh := make(chan Stats) + cfg := createDefaultReceiverConfig(nil, nil, doneCh) + cfg.(*Config).Traces.MaxReplay = b.N + cfg.(*Config).PerfReusePdata = perfReusePdata + r, _ := createTracesReceiver(context.Background(), receiver.Settings{ + ID: component.ID{}, + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.NewNop(), + }, + BuildInfo: component.BuildInfo{}, + }, cfg, consumertest.NewNop()) + b.ResetTimer() + err := r.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(b, err) + defer func() { + err := r.Shutdown(context.Background()) + require.NoError(b, err) + }() + <-doneCh + }) + } } From f13b876ee66e6e9bbb3518e0c6d98a56142c929e Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 22 Jan 2025 15:33:19 +0000 Subject: [PATCH 4/5] Make otelbench use optimization --- loadgen/cmd/otelbench/config.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/loadgen/cmd/otelbench/config.yaml b/loadgen/cmd/otelbench/config.yaml index 6336a4e0..9b0da198 100644 --- a/loadgen/cmd/otelbench/config.yaml +++ b/loadgen/cmd/otelbench/config.yaml @@ -1,5 +1,6 @@ receivers: loadgen: + perf_reuse_pdata: true nop: exporters: From 0e60529fa660bfc15430c258a862acc79ebcef45 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Wed, 22 Jan 2025 15:34:07 +0000 Subject: [PATCH 5/5] Revert example change --- loadgen/cmd/otelsoak/config.example.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/loadgen/cmd/otelsoak/config.example.yaml b/loadgen/cmd/otelsoak/config.example.yaml index c4aa6232..11ed99c7 100644 --- a/loadgen/cmd/otelsoak/config.example.yaml +++ b/loadgen/cmd/otelsoak/config.example.yaml @@ -47,15 +47,15 @@ service: logs: receivers: [loadgen] processors: [ratelimit, transform/rewrite] - exporters: [otlp] + exporters: [otlp, debug] metrics: receivers: [loadgen] processors: [ratelimit, transform/rewrite] - exporters: [otlp] + exporters: [otlp, debug] traces: receivers: [loadgen] processors: [ratelimit, transform/rewrite] - exporters: [otlp] + exporters: [otlp, debug] telemetry: metrics: readers: