From 5552d83fba5304eaca0e596006cef7e0cb682627 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Fri, 17 Jan 2025 15:06:26 +0000 Subject: [PATCH] WIP --- receiver/loadgenreceiver/config.go | 3 + receiver/loadgenreceiver/factory.go | 1 + .../{loopinglist.go => loopinglist/list.go} | 2 +- receiver/loadgenreceiver/logs.go | 6 +- receiver/loadgenreceiver/metrics.go | 6 +- receiver/loadgenreceiver/traces.go | 79 +++++++++++-------- receiver/loadgenreceiver/traces_test.go | 49 ++++++------ 7 files changed, 86 insertions(+), 60 deletions(-) rename receiver/loadgenreceiver/internal/{loopinglist.go => loopinglist/list.go} (90%) diff --git a/receiver/loadgenreceiver/config.go b/receiver/loadgenreceiver/config.go index e5b15964..20a0850a 100644 --- a/receiver/loadgenreceiver/config.go +++ b/receiver/loadgenreceiver/config.go @@ -30,6 +30,9 @@ type Config struct { Metrics MetricsConfig `mapstructure:"metrics"` Logs LogsConfig `mapstructure:"logs"` Traces TracesConfig `mapstructure:"traces"` + + // NumWorker is the number of workers to share the load + NumWorkers int `mapstructure:"num_workers"` } type MetricsConfig struct { diff --git a/receiver/loadgenreceiver/factory.go b/receiver/loadgenreceiver/factory.go index 63cef4b2..2d3b2452 100644 --- a/receiver/loadgenreceiver/factory.go +++ b/receiver/loadgenreceiver/factory.go @@ -39,6 +39,7 @@ func createDefaultReceiverConfig(logsDone, metricsDone, tracesDone chan Stats) c Traces: TracesConfig{ doneCh: tracesDone, }, + NumWorkers: 1, } } diff --git a/receiver/loadgenreceiver/internal/loopinglist.go b/receiver/loadgenreceiver/internal/loopinglist/list.go similarity index 90% rename from receiver/loadgenreceiver/internal/loopinglist.go rename to receiver/loadgenreceiver/internal/loopinglist/list.go index 9cc4e9f0..2784c889 100644 --- a/receiver/loadgenreceiver/internal/loopinglist.go +++ b/receiver/loadgenreceiver/internal/loopinglist/list.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package internal // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal" +package loopinglist // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/loopinglist" type LoopingList[T any] struct { items []T diff --git a/receiver/loadgenreceiver/logs.go b/receiver/loadgenreceiver/logs.go index ce5f14da..b5e07ff7 100644 --- a/receiver/loadgenreceiver/logs.go +++ b/receiver/loadgenreceiver/logs.go @@ -32,7 +32,7 @@ import ( "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" - "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal" + "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/loopinglist" ) //go:embed testdata/logs.jsonl @@ -42,7 +42,7 @@ type logsGenerator struct { cfg *Config logger *zap.Logger - samples internal.LoopingList[plog.Logs] + samples loopinglist.LoopingList[plog.Logs] stats Stats @@ -89,7 +89,7 @@ func createLogsReceiver( cfg: genConfig, logger: set.Logger, consumer: consumer, - samples: internal.NewLoopingList(samples), + samples: loopinglist.NewLoopingList(samples), }, nil } diff --git a/receiver/loadgenreceiver/metrics.go b/receiver/loadgenreceiver/metrics.go index a9c31f82..554f47bf 100644 --- a/receiver/loadgenreceiver/metrics.go +++ b/receiver/loadgenreceiver/metrics.go @@ -32,7 +32,7 @@ import ( "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" - "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal" + "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/loopinglist" ) //go:embed testdata/metrics.jsonl @@ -42,7 +42,7 @@ type metricsGenerator struct { cfg *Config logger *zap.Logger - samples internal.LoopingList[pmetric.Metrics] + samples loopinglist.LoopingList[pmetric.Metrics] stats Stats @@ -89,7 +89,7 @@ func createMetricsReceiver( cfg: genConfig, logger: set.Logger, consumer: consumer, - samples: internal.NewLoopingList(samples), + samples: loopinglist.NewLoopingList(samples), }, nil } diff --git a/receiver/loadgenreceiver/traces.go b/receiver/loadgenreceiver/traces.go index 80426f19..9400beb2 100644 --- a/receiver/loadgenreceiver/traces.go +++ b/receiver/loadgenreceiver/traces.go @@ -22,7 +22,9 @@ import ( "bytes" "context" _ "embed" + "errors" "os" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -32,7 +34,7 @@ import ( "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" - "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal" + "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/loopinglist" ) const maxScannerBufSize = 1024 * 1024 @@ -44,9 +46,10 @@ type tracesGenerator struct { cfg *Config logger *zap.Logger - samples internal.LoopingList[ptrace.Traces] + samples loopinglist.BoundedLoopingList[ptrace.Traces] - stats Stats + stats Stats + statsMu sync.Mutex consumer consumer.Traces @@ -91,7 +94,7 @@ func createTracesReceiver( cfg: genConfig, logger: set.Logger, consumer: consumer, - samples: internal.NewLoopingList(samples), + samples: loopinglist.NewBoundedLoopingList(samples, genConfig.Traces.MaxReplay), }, nil } @@ -99,28 +102,42 @@ func (ar *tracesGenerator) Start(ctx context.Context, _ component.Host) error { startCtx, cancelFn := context.WithCancel(ctx) ar.cancelFn = cancelFn - go func() { - for { - select { - case <-startCtx.Done(): - return - default: - } - m := ar.nextTraces() - if err := ar.consumer.ConsumeTraces(startCtx, m); err != nil { - ar.logger.Error(err.Error()) - ar.stats.FailedRequests++ - ar.stats.FailedSpans += m.SpanCount() - } else { - ar.stats.Requests++ - ar.stats.Spans += m.SpanCount() - } - if ar.isDone() { - if ar.cfg.Traces.doneCh != nil { - ar.cfg.Traces.doneCh <- ar.stats + wg := sync.WaitGroup{} + + for i := 0; i < ar.cfg.NumWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-startCtx.Done(): + return + default: } - return + m, err := ar.nextTraces() + if errors.Is(err, loopinglist.ErrLoopLimitReached) { + return + } + if err := ar.consumer.ConsumeTraces(startCtx, m); err != nil { + ar.logger.Error(err.Error()) + ar.statsMu.Lock() + ar.stats.FailedRequests++ + ar.stats.FailedSpans += m.SpanCount() + ar.statsMu.Unlock() + } else { + ar.statsMu.Lock() + ar.stats.Requests++ + ar.stats.Spans += m.SpanCount() + ar.statsMu.Unlock() + } + } + }() + } + go func() { + wg.Wait() + if ar.cfg.Traces.doneCh != nil { + ar.cfg.Traces.doneCh <- ar.stats } }() return nil @@ -133,9 +150,13 @@ func (ar *tracesGenerator) Shutdown(context.Context) error { return nil } -func (ar *tracesGenerator) nextTraces() ptrace.Traces { +func (ar *tracesGenerator) nextTraces() (ptrace.Traces, error) { nextLogs := ptrace.NewTraces() - ar.samples.Next().CopyTo(nextLogs) + s, err := ar.samples.Next() + if err != nil { + return s, err + } + s.CopyTo(nextLogs) rm := nextLogs.ResourceSpans() for i := 0; i < rm.Len(); i++ { @@ -151,9 +172,5 @@ func (ar *tracesGenerator) nextTraces() ptrace.Traces { } } - return nextLogs -} - -func (ar *tracesGenerator) isDone() bool { - return ar.cfg.Traces.MaxReplay > 0 && ar.samples.LoopCount() >= ar.cfg.Traces.MaxReplay + return nextLogs, nil } diff --git a/receiver/loadgenreceiver/traces_test.go b/receiver/loadgenreceiver/traces_test.go index c191ee36..08eb1396 100644 --- a/receiver/loadgenreceiver/traces_test.go +++ b/receiver/loadgenreceiver/traces_test.go @@ -20,6 +20,7 @@ package loadgenreceiver // import "github.com/elastic/opentelemetry-collector-co import ( "bytes" "context" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -32,26 +33,30 @@ import ( func TestTracesGenerator_doneCh(t *testing.T) { const maxReplay = 2 - doneCh := make(chan Stats) - sink := &consumertest.TracesSink{} - r, _ := createTracesReceiver(context.Background(), receiver.Settings{ - ID: component.ID{}, - TelemetrySettings: component.TelemetrySettings{ - Logger: zap.NewNop(), - }, - BuildInfo: component.BuildInfo{}, - }, &Config{Traces: TracesConfig{ - MaxReplay: maxReplay, - doneCh: doneCh, - }}, sink) - err := r.Start(context.Background(), componenttest.NewNopHost()) - assert.NoError(t, err) - defer func() { - assert.NoError(t, r.Shutdown(context.Background())) - }() - stats := <-doneCh - want := maxReplay * bytes.Count(demoTraces, []byte("\n")) - assert.Equal(t, want, stats.Requests) - assert.Equal(t, want, len(sink.AllTraces())) - assert.Equal(t, sink.SpanCount(), stats.Spans) + for _, workers := range []int{1, 2} { + t.Run(fmt.Sprintf("workers=%d", workers), func(t *testing.T) { + doneCh := make(chan Stats) + sink := &consumertest.TracesSink{} + cfg := createDefaultReceiverConfig(nil, nil, doneCh) + cfg.(*Config).Traces.MaxReplay = maxReplay + cfg.(*Config).NumWorkers = workers + r, _ := createTracesReceiver(context.Background(), receiver.Settings{ + ID: component.ID{}, + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.NewNop(), + }, + BuildInfo: component.BuildInfo{}, + }, cfg, sink) + err := r.Start(context.Background(), componenttest.NewNopHost()) + assert.NoError(t, err) + defer func() { + assert.NoError(t, r.Shutdown(context.Background())) + }() + stats := <-doneCh + want := maxReplay * bytes.Count(demoTraces, []byte("\n")) + assert.Equal(t, want, stats.Requests) + assert.Equal(t, want, len(sink.AllTraces())) + assert.Equal(t, sink.SpanCount(), stats.Spans) + }) + } }