Skip to content

Commit

Permalink
[receiver/loadgen] Simulate multiple agents via new concurrency con…
Browse files Browse the repository at this point in the history
…fig (#313)

Add a concurrency config to loadgenreceiver to simulate multiple agents (agent_replicas in apmsoak) sending otlp requests.
  • Loading branch information
carsonip authored Jan 22, 2025
1 parent f1c33d8 commit ec334ca
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 195 deletions.
3 changes: 3 additions & 0 deletions loadgen/cmd/otelsoak/config.example.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
receivers:
loadgen:
# # Similar to agent_replicas config in apmsoak
# # Higher concurrency value translates to higher load
# concurrency: 16

exporters:
otlp:
Expand Down
5 changes: 5 additions & 0 deletions receiver/loadgenreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ The receiver generates telemetry as quickly as possible. Any rate limiting shoul

The receiver only rewrites timestamps to Now, and does not modify any other fields. Therefore, it will have the same cardinality as the original canned data. To simulate higher cardinality (e.g. trace ID, service name), use `transform` processor with OTTL to rewrite fields.

## Config

See [./config.go](./config.go) for configurations.

## Sample configuration

```yaml
receivers:
loadgen:
concurrency: 128

processors:

Expand Down
21 changes: 20 additions & 1 deletion receiver/loadgenreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,28 @@
package loadgenreceiver // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver"

import (
"fmt"

"go.opentelemetry.io/collector/component"
)

type (
JsonlFile string
)

// Config defines configuration for HostMetrics receiver.
// Config defines configuration for loadgen receiver.
type Config struct {
Metrics MetricsConfig `mapstructure:"metrics"`
Logs LogsConfig `mapstructure:"logs"`
Traces TracesConfig `mapstructure:"traces"`

// Concurrency is the amount of concurrency when sending to next consumer.
// The concurrent workers share the amount of workload, instead of multiplying the amount of workload,
// i.e. loadgenreceiver still sends up to the same MaxReplay limit.
// A higher concurrency translates to a higher load.
// As requests are synchronous, when concurrency is N, there will be N in-flight requests.
// This is similar to the `agent_replicas` config in apmsoak.
Concurrency int `mapstructure:"concurrency"`
}

type MetricsConfig struct {
Expand Down Expand Up @@ -72,5 +82,14 @@ var _ component.Config = (*Config)(nil)

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
if cfg.Logs.MaxReplay < 0 {
return fmt.Errorf("logs::max_replay must be >= 0")
}
if cfg.Metrics.MaxReplay < 0 {
return fmt.Errorf("metrics::max_replay must be >= 0")
}
if cfg.Traces.MaxReplay < 0 {
return fmt.Errorf("traces::max_replay must be >= 0")
}
return nil
}
1 change: 1 addition & 0 deletions receiver/loadgenreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func createDefaultReceiverConfig(logsDone, metricsDone, tracesDone chan Stats) c
Traces: TracesConfig{
doneCh: tracesDone,
},
Concurrency: 1,
}
}

Expand Down
65 changes: 65 additions & 0 deletions receiver/loadgenreceiver/internal/list/looping.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package list // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/list"

import (
"errors"
"sync"
)

var ErrLoopLimitReached = errors.New("loop limit reached")

// LoopingList is a list that loops over the provided list of items with an optional loop limit.
type LoopingList[T any] struct {
items []T
idx int
loopCnt int
loopLimit int
mu sync.Mutex
}

// NewLoopingList returns a LoopingList over items for an optional loopLimit.
// Setting loopLimit to 0 causes the list to loop infinitely.
func NewLoopingList[T any](items []T, loopLimit int) *LoopingList[T] {
return &LoopingList[T]{
items: items,
loopLimit: loopLimit,
}
}

// Next returns the next item in list with a nil error.
// If loop limit is reached, it returns ErrLoopLimitReached.
// Safe for concurrent use.
func (s *LoopingList[T]) Next() (T, error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.loopLimit != 0 && s.loopCnt >= s.loopLimit {
var zero T
return zero, ErrLoopLimitReached
}

item := s.items[s.idx]

s.idx = (s.idx + 1) % len(s.items)
if s.idx == 0 {
s.loopCnt++
}

return item, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,36 @@
// specific language governing permissions and limitations
// under the License.

package internal // import "github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal"
package list

type LoopingList[T any] struct {
items []T
idx int
loopCnt int
}

func NewLoopingList[T any](items []T) LoopingList[T] {
return LoopingList[T]{
items: items,
}
}
import (
"fmt"
"testing"

func (s *LoopingList[T]) Next() T {
defer func() {
s.idx = (s.idx + 1) % len(s.items)
if s.idx == 0 {
s.loopCnt++
}
}()
return s.items[s.idx]
}
"github.com/stretchr/testify/assert"
)

func (s *LoopingList[T]) LoopCount() int {
return s.loopCnt
func TestNext_loopLimit(t *testing.T) {
for _, loopLimit := range []int{0, 10} {
t.Run(fmt.Sprintf("loopLimit=%d", loopLimit), func(t *testing.T) {
items := []int{0, 1, 2}
l := NewLoopingList(items, loopLimit)
for i := 0; ; i++ {
item, err := l.Next()
if loopLimit == 0 {
if i > 10 {
// terminate after some point if infinitely looping
return
}
} else {
if i >= len(items)*loopLimit {
assert.ErrorIs(t, err, ErrLoopLimitReached)
return
}
}
assert.NoError(t, err)
assert.Equal(t, i%3, item)
}
})
}
}
86 changes: 51 additions & 35 deletions receiver/loadgenreceiver/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"bytes"
"context"
_ "embed"
"errors"
"os"
"sync"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -32,7 +34,7 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal"
"github.com/elastic/opentelemetry-collector-components/receiver/loadgenreceiver/internal/list"
)

//go:embed testdata/logs.jsonl
Expand All @@ -42,9 +44,10 @@ type logsGenerator struct {
cfg *Config
logger *zap.Logger

samples internal.LoopingList[plog.Logs]
samples *list.LoopingList[plog.Logs]

stats Stats
stats Stats
statsMu sync.Mutex

consumer consumer.Logs

Expand All @@ -70,7 +73,7 @@ func createLogsReceiver(
}
}

var samples []plog.Logs
var items []plog.Logs
scanner := bufio.NewScanner(bytes.NewReader(sampleLogs))
scanner.Buffer(make([]byte, 0, maxScannerBufSize), maxScannerBufSize)
for scanner.Scan() {
Expand All @@ -79,7 +82,7 @@ func createLogsReceiver(
if err != nil {
return nil, err
}
samples = append(samples, lineLogs)
items = append(items, lineLogs)
}
if err := scanner.Err(); err != nil {
return nil, err
Expand All @@ -89,36 +92,49 @@ func createLogsReceiver(
cfg: genConfig,
logger: set.Logger,
consumer: consumer,
samples: internal.NewLoopingList(samples),
samples: list.NewLoopingList(items, genConfig.Logs.MaxReplay),
}, nil
}

func (ar *logsGenerator) Start(ctx context.Context, _ component.Host) error {
startCtx, cancelFn := context.WithCancel(ctx)
ar.cancelFn = cancelFn

go func() {
for {
select {
case <-startCtx.Done():
return
default:
}
m := ar.nextLogs()
if err := ar.consumer.ConsumeLogs(startCtx, m); err != nil {
ar.logger.Error(err.Error())
ar.stats.FailedRequests++
ar.stats.FailedLogRecords += m.LogRecordCount()
} else {
ar.stats.Requests++
ar.stats.LogRecords += m.LogRecordCount()
}
if ar.isDone() {
if ar.cfg.Logs.doneCh != nil {
ar.cfg.Logs.doneCh <- ar.stats
wg := sync.WaitGroup{}

for i := 0; i < ar.cfg.Concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-startCtx.Done():
return
default:
}
next, err := ar.nextLogs()
if errors.Is(err, list.ErrLoopLimitReached) {
return
}
if err := ar.consumer.ConsumeLogs(startCtx, next); err != nil {
ar.logger.Error(err.Error())
ar.statsMu.Lock()
ar.stats.FailedRequests++
ar.stats.FailedLogRecords += next.LogRecordCount()
ar.statsMu.Unlock()
} else {
ar.statsMu.Lock()
ar.stats.Requests++
ar.stats.LogRecords += next.LogRecordCount()
ar.statsMu.Unlock()
}
return
}
}()
}
go func() {
wg.Wait()
if ar.cfg.Logs.doneCh != nil {
ar.cfg.Logs.doneCh <- ar.stats
}
}()
return nil
Expand All @@ -131,13 +147,17 @@ func (ar *logsGenerator) Shutdown(context.Context) error {
return nil
}

func (ar *logsGenerator) nextLogs() plog.Logs {
func (ar *logsGenerator) nextLogs() (plog.Logs, error) {
now := pcommon.NewTimestampFromTime(time.Now())

nextLogs := plog.NewLogs()
ar.samples.Next().CopyTo(nextLogs)
next := plog.NewLogs()
sample, err := ar.samples.Next()
if err != nil {
return sample, err
}
sample.CopyTo(next)

rm := nextLogs.ResourceLogs()
rm := next.ResourceLogs()
for i := 0; i < rm.Len(); i++ {
for j := 0; j < rm.At(i).ScopeLogs().Len(); j++ {
for k := 0; k < rm.At(i).ScopeLogs().At(j).LogRecords().Len(); k++ {
Expand All @@ -147,9 +167,5 @@ func (ar *logsGenerator) nextLogs() plog.Logs {
}
}

return nextLogs
}

func (ar *logsGenerator) isDone() bool {
return ar.cfg.Logs.MaxReplay > 0 && ar.samples.LoopCount() >= ar.cfg.Logs.MaxReplay
return next, nil
}
Loading

0 comments on commit ec334ca

Please sign in to comment.